-
Notifications
You must be signed in to change notification settings - Fork 30.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add the toArray method from the TC39 iterator helper proposal to Readable streams. This also enables a common-use case of converting a stream to an array. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41553 Reviewed-By: Robert Nagy <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
- Loading branch information
1 parent
6de8e51
commit 62e1a68
Showing
3 changed files
with
136 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
'use strict'; | ||
|
||
const common = require('../common'); | ||
const { | ||
Readable, | ||
} = require('stream'); | ||
const assert = require('assert'); | ||
|
||
{ | ||
// Works on a synchronous stream | ||
(async () => { | ||
const tests = [ | ||
[], | ||
[1], | ||
[1, 2, 3], | ||
Array(100).fill().map((_, i) => i), | ||
]; | ||
for (const test of tests) { | ||
const stream = Readable.from(test); | ||
const result = await stream.toArray(); | ||
assert.deepStrictEqual(result, test); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Works on a non-object-mode stream and flattens it | ||
(async () => { | ||
const stream = Readable.from( | ||
[Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])] | ||
, { objectMode: false }); | ||
const result = await stream.toArray(); | ||
assert.strictEqual(Buffer.isBuffer(result), true); | ||
assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]); | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Works on an asynchronous stream | ||
(async () => { | ||
const tests = [ | ||
[], | ||
[1], | ||
[1, 2, 3], | ||
Array(100).fill().map((_, i) => i), | ||
]; | ||
for (const test of tests) { | ||
const stream = Readable.from(test).map((x) => Promise.resolve(x)); | ||
const result = await stream.toArray(); | ||
assert.deepStrictEqual(result, test); | ||
} | ||
})().then(common.mustCall()); | ||
} | ||
|
||
{ | ||
// Support for AbortSignal | ||
const ac = new AbortController(); | ||
let stream; | ||
assert.rejects(async () => { | ||
stream = Readable.from([1, 2, 3]).map(async (x) => { | ||
if (x === 3) { | ||
await new Promise(() => {}); // Explicitly do not pass signal here | ||
} | ||
return Promise.resolve(x); | ||
}); | ||
await stream.toArray({ signal: ac.signal }); | ||
}, { | ||
name: 'AbortError', | ||
}).then(common.mustCall(() => { | ||
// Only stops toArray, does not destory the stream | ||
assert(stream.destroyed, false); | ||
})); | ||
ac.abort(); | ||
} | ||
{ | ||
// Test result is a Promise | ||
const result = Readable.from([1, 2, 3, 4, 5]).toArray(); | ||
assert.strictEqual(result instanceof Promise, true); | ||
} |