From 247caacc7ae454eec4b2911eec746d099335dd70 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Sun, 19 Feb 2023 12:30:07 +0530 Subject: [PATCH] stream: fix pipeline callback not called on ended stream Fixes: https://github.com/nodejs/node/issues/46595 PR-URL: https://github.com/nodejs/node/pull/46600 Reviewed-By: Robert Nagy Reviewed-By: Matteo Collina Reviewed-By: Luigi Pinca Reviewed-By: Benjamin Gruenbaum Reviewed-By: James M Snell Reviewed-By: Colin Ihrig --- lib/internal/streams/pipeline.js | 12 +++++++++-- test/parallel/test-stream-pipeline.js | 25 +++++++++++++++++++++++ test/parallel/test-webstreams-pipeline.js | 12 ++++++++++- 3 files changed, 46 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 667b5268477d42..22bb042d71b420 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -38,6 +38,7 @@ const { isTransformStream, isWebStream, isReadableStream, + isReadableEnded, } = require('internal/streams/utils'); const { AbortController } = require('internal/abort_controller'); @@ -417,10 +418,17 @@ function pipe(src, dst, finish, { end }) { // Compat. Before node v10.12.0 stdio used to throw an error so // pipe() did/does not end() stdio destinations. // Now they allow it but "secretly" don't close the underlying fd. - src.once('end', () => { + + function endFn() { ended = true; dst.end(); - }); + } + + if (isReadableEnded(src)) { // End the destination if the source has already ended. + process.nextTick(endFn); + } else { + src.once('end', endFn); + } } else { finish(); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index 65ef5164c14b4c..d37ca275f1dddf 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1591,3 +1591,28 @@ const tsp = require('timers/promises'); assert.strictEqual(writable.endCount, 1); })); } + +{ + const readable = new Readable({ + read() {} + }); + readable.on('end', common.mustCall(() => { + pipeline(readable, new PassThrough(), common.mustSucceed()); + })); + readable.push(null); + readable.read(); +} + +{ + const dup = new Duplex({ + read() {}, + write(chunk, enc, cb) { + cb(); + } + }); + dup.on('end', common.mustCall(() => { + pipeline(dup, new PassThrough(), common.mustSucceed()); + })); + dup.push(null); + dup.read(); +} diff --git a/test/parallel/test-webstreams-pipeline.js b/test/parallel/test-webstreams-pipeline.js index 46bdf8718ea97a..ac673dd9d42ea7 100644 --- a/test/parallel/test-webstreams-pipeline.js +++ b/test/parallel/test-webstreams-pipeline.js @@ -2,7 +2,7 @@ const common = require('../common'); const assert = require('assert'); -const { Readable, Writable, Transform, pipeline } = require('stream'); +const { Readable, Writable, Transform, pipeline, PassThrough } = require('stream'); const { pipeline: pipelinePromise } = require('stream/promises'); const { ReadableStream, WritableStream, TransformStream } = require('stream/web'); const http = require('http'); @@ -410,3 +410,13 @@ const http = require('http'); } c.close(); } + +{ + const rs = new ReadableStream({ + start(controller) { + controller.close(); + } + }); + + pipeline(rs, new PassThrough(), common.mustSucceed()); +}