Skip to content

Commit

Permalink
feat: 🎸 progress on writable stream
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Jun 20, 2023
1 parent cc55037 commit a568afd
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 23 deletions.
2 changes: 1 addition & 1 deletion src/fsa-to-node/FsaNodeFs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,7 @@ export class FsaNodeFs implements FsCallbackApi, FsSynchronousApi, FsCommonObjec
: ({ encoding: options } as opts.IWriteStreamOptions);
const filename = pathToFilename(path);
const location = pathToLocation(filename);
const flags = flagsToNumber(optionsObj.flags);
const flags = flagsToNumber(optionsObj.flags ?? 'w');
const createIfMissing = !!(flags & FLAG.O_CREAT);
const handle = this.getFile(location[0], location[1], 'createWriteStream', createIfMissing);
return new FsaNodeWriteStream(handle, filename, optionsObj);
Expand Down
106 changes: 88 additions & 18 deletions src/fsa-to-node/FsaNodeWriteStream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Writable } from 'stream';
import { Defer } from 'thingies/es6/Defer';
import type { IFileSystemFileHandle } from '../fsa/types';
import { codeMutex } from 'thingies/es6/codeMutex';
import type { IFileSystemFileHandle, IFileSystemWritableFileStream } from '../fsa/types';
import type { IWriteStream } from '../node/types/misc';
import type { IWriteStreamOptions } from '../node/types/options';

Expand All @@ -21,24 +22,27 @@ import type { IWriteStreamOptions } from '../node/types/options';
* swap file), but that is the trade-off we have to make.
*/
export class FsaNodeWriteStream extends Writable implements IWriteStream {
protected __pending: boolean = true;
protected ready = new Defer<void>();
protected closed: boolean = false;
protected __pending__: boolean = true;
protected __stream__: Promise<IFileSystemWritableFileStream>;
protected __closed__: boolean = false;
protected readonly __mutex__ = codeMutex();

public constructor(
protected readonly handle: Promise<IFileSystemFileHandle>,
handle: Promise<IFileSystemFileHandle>,
public readonly path: string,
protected readonly options?: IWriteStreamOptions,
) {
super();
handle
.then(() => {
this.__pending = false;
this.ready.resolve();
})
.catch(error => {
this.ready.reject(error);
});
const stream = new Defer<IFileSystemWritableFileStream>();
this.__stream__ = stream.promise;
(async () => {
const fsaHandle = await handle;
const writable = await fsaHandle.createWritable({keepExistingData: true});
this.__pending__ = false;
stream.resolve(writable);
})().catch(error => {
stream.reject(error);
});
}

// ------------------------------------------------------------- IWriteStream
Expand All @@ -48,16 +52,82 @@ export class FsaNodeWriteStream extends Writable implements IWriteStream {
}

public get pending(): boolean {
return this.__pending;
return this.__pending__;
}

public close(): void {}
public close(cb): void {
if (cb) this.once('close', cb);
if (this.__closed__) {
process.nextTick(() => this.emit('close'));
return;
}
this.__closed__ = true;
(async () => {
try {
const writable = await this.__stream__;
this.__mutex__(async () => {
console.log('closing')
await writable.close();
});
this.emit('close');
} catch (error) {
this.emit('error', error);
this.emit('close', error);
}
})().catch(() => {});
}

// ----------------------------------------------------------------- Writable

_write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {}
private async ___write___(buffers: Buffer[]): Promise<void> {
const writable = await this.__stream__;
this.__mutex__(async () => {
for (const buffer of buffers) {
try {
console.log(1);
await writable.write(buffer);
console.log(2);
} catch (error) {
console.log('err', error);
}
}
});
}

_write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {
(async () => {
try {
await this.___write___([chunk]);
callback(null);
} catch (error) {
callback(error);
}
})().catch(() => {});
}

_writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void {}
_writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void {
(async () => {
try {
const buffers = chunks.map(({chunk}) => chunk);
await this.___write___(buffers);
callback(null);
} catch (error) {
callback(error);
}
})().catch(() => {});
}

_final(callback: (error?: Error | null) => void): void {}
_final(callback: (error?: Error | null) => void): void {
(async () => {
try {
const writable = await this.__stream__;
this.__mutex__(async () => {
await writable.close();
});
callback(null);
} catch (error) {
callback(error);
}
})().catch(() => {});
}
}
14 changes: 14 additions & 0 deletions src/fsa-to-node/__tests__/FsaNodeFs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -631,3 +631,17 @@ describe('.read()', () => {
expect(buffer[1]).toBe(115);
});
});

describe('.createWriteStream()', () => {
test.only('can use stream to write to a file', async () => {
const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' });
const stream = fs.createWriteStream('/folder/file');
stream.write('a');
const onClose = new Promise(resolve => stream.on('close', (err) => {
resolve(err);
}));
stream.close();
await onClose;
console.log(mfs.__vol.toJSON());
});
});
13 changes: 11 additions & 2 deletions src/node-to-fsa/NodeFileSystemWritableFileStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,15 @@ export class NodeFileSystemWritableFileStream extends WritableStream implements
}

protected async writeBase(chunk: Data): Promise<void> {
console.log('base')
const writer = this.getWriter();
console.log('writer')
try {
await writer.write(chunk);
console.log('write', chunk)
await writer.write(new Uint8Array([123]));
console.log('done')
} finally {
console.log('finally');
writer.releaseLock();
}
}
Expand All @@ -147,7 +152,11 @@ export class NodeFileSystemWritableFileStream extends WritableStream implements
case DataView:
return this.writeBase(params);
default: {
if (ArrayBuffer.isView(params)) return this.writeBase(params);
if (ArrayBuffer.isView(params)) {

console.log('ERIT', params)
return this.writeBase(params);
}
else {
const options = params as FileSystemWritableFileStreamParams;
switch (options.type) {
Expand Down
2 changes: 1 addition & 1 deletion src/node/types/misc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ export interface IWriteStream extends Writable {
bytesWritten: number;
path: string;
pending: boolean;
close();
close(callback?: (err?: Error) => void): void;
}

export interface IFSWatcher extends EventEmitter {
Expand Down
11 changes: 10 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,14 @@
"declaration": true
},
"include": ["src"],
"exclude": ["src/__tests__"]
"exclude": [
"src/__tests__",
"node_modules",
"lib",
"es6",
"es2020",
"esm",
"docs",
"README.md"
]
}

0 comments on commit a568afd

Please sign in to comment.