From a568afd37a5cdfa91945c4074a8b326d391f0573 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Sun, 18 Jun 2023 02:26:41 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20progress=20on=20writable?= =?UTF-8?q?=20stream?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/fsa-to-node/FsaNodeFs.ts | 2 +- src/fsa-to-node/FsaNodeWriteStream.ts | 106 +++++++++++++++--- src/fsa-to-node/__tests__/FsaNodeFs.test.ts | 14 +++ .../NodeFileSystemWritableFileStream.ts | 13 ++- src/node/types/misc.ts | 2 +- tsconfig.json | 11 +- 6 files changed, 125 insertions(+), 23 deletions(-) diff --git a/src/fsa-to-node/FsaNodeFs.ts b/src/fsa-to-node/FsaNodeFs.ts index eeb258e43..ced5acf27 100644 --- a/src/fsa-to-node/FsaNodeFs.ts +++ b/src/fsa-to-node/FsaNodeFs.ts @@ -829,7 +829,7 @@ export class FsaNodeFs implements FsCallbackApi, FsSynchronousApi, FsCommonObjec : ({ encoding: options } as opts.IWriteStreamOptions); const filename = pathToFilename(path); const location = pathToLocation(filename); - const flags = flagsToNumber(optionsObj.flags); + const flags = flagsToNumber(optionsObj.flags ?? 'w'); const createIfMissing = !!(flags & FLAG.O_CREAT); const handle = this.getFile(location[0], location[1], 'createWriteStream', createIfMissing); return new FsaNodeWriteStream(handle, filename, optionsObj); diff --git a/src/fsa-to-node/FsaNodeWriteStream.ts b/src/fsa-to-node/FsaNodeWriteStream.ts index 3a28e3fcc..f3fb027b0 100644 --- a/src/fsa-to-node/FsaNodeWriteStream.ts +++ b/src/fsa-to-node/FsaNodeWriteStream.ts @@ -1,6 +1,7 @@ import { Writable } from 'stream'; import { Defer } from 'thingies/es6/Defer'; -import type { IFileSystemFileHandle } from '../fsa/types'; +import { codeMutex } from 'thingies/es6/codeMutex'; +import type { IFileSystemFileHandle, IFileSystemWritableFileStream } from '../fsa/types'; import type { IWriteStream } from '../node/types/misc'; import type { IWriteStreamOptions } from '../node/types/options'; @@ -21,24 +22,27 @@ import type { IWriteStreamOptions } from '../node/types/options'; * swap file), but that is the trade-off we have to make. */ export class FsaNodeWriteStream extends Writable implements IWriteStream { - protected __pending: boolean = true; - protected ready = new Defer(); - protected closed: boolean = false; + protected __pending__: boolean = true; + protected __stream__: Promise; + protected __closed__: boolean = false; + protected readonly __mutex__ = codeMutex(); public constructor( - protected readonly handle: Promise, + handle: Promise, public readonly path: string, protected readonly options?: IWriteStreamOptions, ) { super(); - handle - .then(() => { - this.__pending = false; - this.ready.resolve(); - }) - .catch(error => { - this.ready.reject(error); - }); + const stream = new Defer(); + this.__stream__ = stream.promise; + (async () => { + const fsaHandle = await handle; + const writable = await fsaHandle.createWritable({keepExistingData: true}); + this.__pending__ = false; + stream.resolve(writable); + })().catch(error => { + stream.reject(error); + }); } // ------------------------------------------------------------- IWriteStream @@ -48,16 +52,82 @@ export class FsaNodeWriteStream extends Writable implements IWriteStream { } public get pending(): boolean { - return this.__pending; + return this.__pending__; } - public close(): void {} + public close(cb): void { + if (cb) this.once('close', cb); + if (this.__closed__) { + process.nextTick(() => this.emit('close')); + return; + } + this.__closed__ = true; + (async () => { + try { + const writable = await this.__stream__; + this.__mutex__(async () => { + console.log('closing') + await writable.close(); + }); + this.emit('close'); + } catch (error) { + this.emit('error', error); + this.emit('close', error); + } + })().catch(() => {}); + } // ----------------------------------------------------------------- Writable - _write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void {} + private async ___write___(buffers: Buffer[]): Promise { + const writable = await this.__stream__; + this.__mutex__(async () => { + for (const buffer of buffers) { + try { + console.log(1); + await writable.write(buffer); + console.log(2); + } catch (error) { + console.log('err', error); + } + } + }); + } + + _write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void { + (async () => { + try { + await this.___write___([chunk]); + callback(null); + } catch (error) { + callback(error); + } + })().catch(() => {}); + } - _writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void {} + _writev(chunks: Array<{ chunk: any; encoding: string }>, callback: (error?: Error | null) => void): void { + (async () => { + try { + const buffers = chunks.map(({chunk}) => chunk); + await this.___write___(buffers); + callback(null); + } catch (error) { + callback(error); + } + })().catch(() => {}); + } - _final(callback: (error?: Error | null) => void): void {} + _final(callback: (error?: Error | null) => void): void { + (async () => { + try { + const writable = await this.__stream__; + this.__mutex__(async () => { + await writable.close(); + }); + callback(null); + } catch (error) { + callback(error); + } + })().catch(() => {}); + } } diff --git a/src/fsa-to-node/__tests__/FsaNodeFs.test.ts b/src/fsa-to-node/__tests__/FsaNodeFs.test.ts index 971b357d2..9604f2c6a 100644 --- a/src/fsa-to-node/__tests__/FsaNodeFs.test.ts +++ b/src/fsa-to-node/__tests__/FsaNodeFs.test.ts @@ -631,3 +631,17 @@ describe('.read()', () => { expect(buffer[1]).toBe(115); }); }); + +describe('.createWriteStream()', () => { + test.only('can use stream to write to a file', async () => { + const { fs, mfs } = setup({ folder: { file: 'test' }, 'empty-folder': null, 'f.html': 'test' }); + const stream = fs.createWriteStream('/folder/file'); + stream.write('a'); + const onClose = new Promise(resolve => stream.on('close', (err) => { + resolve(err); + })); + stream.close(); + await onClose; + console.log(mfs.__vol.toJSON()); + }); +}); diff --git a/src/node-to-fsa/NodeFileSystemWritableFileStream.ts b/src/node-to-fsa/NodeFileSystemWritableFileStream.ts index 63f112695..04df91a24 100644 --- a/src/node-to-fsa/NodeFileSystemWritableFileStream.ts +++ b/src/node-to-fsa/NodeFileSystemWritableFileStream.ts @@ -120,10 +120,15 @@ export class NodeFileSystemWritableFileStream extends WritableStream implements } protected async writeBase(chunk: Data): Promise { + console.log('base') const writer = this.getWriter(); + console.log('writer') try { - await writer.write(chunk); + console.log('write', chunk) + await writer.write(new Uint8Array([123])); + console.log('done') } finally { + console.log('finally'); writer.releaseLock(); } } @@ -147,7 +152,11 @@ export class NodeFileSystemWritableFileStream extends WritableStream implements case DataView: return this.writeBase(params); default: { - if (ArrayBuffer.isView(params)) return this.writeBase(params); + if (ArrayBuffer.isView(params)) { + + console.log('ERIT', params) + return this.writeBase(params); + } else { const options = params as FileSystemWritableFileStreamParams; switch (options.type) { diff --git a/src/node/types/misc.ts b/src/node/types/misc.ts index 5d7a6c56e..4ee3f1e64 100644 --- a/src/node/types/misc.ts +++ b/src/node/types/misc.ts @@ -94,7 +94,7 @@ export interface IWriteStream extends Writable { bytesWritten: number; path: string; pending: boolean; - close(); + close(callback?: (err?: Error) => void): void; } export interface IFSWatcher extends EventEmitter { diff --git a/tsconfig.json b/tsconfig.json index a1cab753f..38dc1a6be 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -11,5 +11,14 @@ "declaration": true }, "include": ["src"], - "exclude": ["src/__tests__"] + "exclude": [ + "src/__tests__", + "node_modules", + "lib", + "es6", + "es2020", + "esm", + "docs", + "README.md" + ] }