From 216bb20367a5a333287f8f876d0475f29a614f8b Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Fri, 4 Feb 2022 10:36:58 +0100 Subject: [PATCH] stream: resume stream on drain Previously we would just resume "flowing" the stream without reseting the "paused" state. Fixes this by properly using pause/resume methods for .pipe. Fixes: https://github.com/nodejs/node/issues/41785 --- lib/internal/streams/readable.js | 3 +-- .../test-stream-readable-pause-and-resume.js | 17 +++++++++++++ tmp.js | 24 +++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 tmp.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index d0125386c8ae8ee..7dc2e5346afad91 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -853,8 +853,7 @@ function pipeOnDrain(src, dest) { if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && EE.listenerCount(src, 'data')) { - state.flowing = true; - flow(src); + src.resume(); } }; } diff --git a/test/parallel/test-stream-readable-pause-and-resume.js b/test/parallel/test-stream-readable-pause-and-resume.js index 294ef2c35d4608c..f28112da10e1bb9 100644 --- a/test/parallel/test-stream-readable-pause-and-resume.js +++ b/test/parallel/test-stream-readable-pause-and-resume.js @@ -56,3 +56,20 @@ function readAndPause() { assert(readable.isPaused()); }); } + +{ + const { PassThrough } = require('stream'); + + const source3 = new PassThrough(); + const target3 = new PassThrough(); + + const chunk = Buffer.allocUnsafe(1000); + let chunks = 1; + while (target3.write(chunk)) chunks++; + + source3.pipe(target3); + target3.on('drain', common.mustCall(() => { + assert(!source3.isPaused()); + })); + target3.on('data', () => {}); +} diff --git a/tmp.js b/tmp.js new file mode 100644 index 000000000000000..0cb3f97976f8bbe --- /dev/null +++ b/tmp.js @@ -0,0 +1,24 @@ +const { PassThrough } = require('stream'); + +// THIRD EXPERIMENT +console.info('\n********** THIRD EXPERIMENT **********'); +const source3 = new PassThrough(); +const target3 = new PassThrough(); + +// stall target3 +const chunk = Buffer.allocUnsafe(1000); +let chunks = 1; +while (target3.write(chunk)) chunks++; +console.info(`${chunks} chunks of ${chunk.length} bytes to stall target3`); + +// `Readable.pipe()` PAUSES the source if the target needs drain (only in +// version >= v14.17.0) and it does not resume it after drain +console.info(`source3 before pipe. Paused: ${source3.isPaused()}`); +source3.pipe(target3); +console.info(`source3 after pipe. Paused: ${source3.isPaused()}`); +target3.on('drain', () => { + console.info('target3 drained'); + console.info(`source3 after drain. Paused: ${source3.isPaused()}`); + console.info(source3.readableFlowing); +}); +target3.on('data', () => {});