Skip to content

Commit

Permalink
stream: add asIndexedPairs
Browse files Browse the repository at this point in the history
Add the asIndexedPairs method for readable streams.

PR-URL: #41681
Refs: https://github.com/tc39/proposal-iterator-helpers#asindexedpairs
Reviewed-By: Robert Nagy <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
  • Loading branch information
benjamingr authored and ruyadorno committed Feb 7, 2022
1 parent ae34900 commit 311050e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
24 changes: 24 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2064,6 +2064,30 @@ import { Readable } from 'stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
```

### `readable.asIndexedPairs([options])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1 - Experimental
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream of indexed pairs.

This method returns a new stream with chunks of the underlying stream paired
with a counter in the form `[index, chunk]`. The first index value is 0 and it
increases by 1 for each chunk produced.

```mjs
import { Readable } from 'stream';

const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
```

### Duplex and transform streams

#### Class: `stream.Duplex`
Expand Down
11 changes: 11 additions & 0 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ async function * map(fn, options) {
}
}

async function* asIndexedPairs(options) {
let index = 0;
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError({ cause: options.signal.reason });
}
yield [index++, val];
}
}

async function some(fn, options) {
// https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
// Note that some does short circuit but also closes the iterator if it does
Expand Down Expand Up @@ -286,6 +296,7 @@ function take(number, options) {
}

module.exports.streamReturningOperators = {
asIndexedPairs,
drop,
filter,
flatMap,
Expand Down
47 changes: 47 additions & 0 deletions test/parallel/test-stream-asIndexedPairs.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import '../common/index.mjs';
import { Readable } from 'stream';
import { deepStrictEqual, rejects } from 'assert';

{
// asIndexedPairs with a synchronous stream
const pairs = await Readable.from([1, 2, 3]).asIndexedPairs().toArray();
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
const empty = await Readable.from([]).asIndexedPairs().toArray();
deepStrictEqual(empty, []);
}

{
// asIndexedPairs works an asynchronous streams
const asyncFrom = (...args) => Readable.from(...args).map(async (x) => x);
const pairs = await asyncFrom([1, 2, 3]).asIndexedPairs().toArray();
deepStrictEqual(pairs, [[0, 1], [1, 2], [2, 3]]);
const empty = await asyncFrom([]).asIndexedPairs().toArray();
deepStrictEqual(empty, []);
}

{
// Does not enumerate an infinite stream
const infinite = () => Readable.from(async function* () {
while (true) yield 1;
}());
const pairs = await infinite().asIndexedPairs().take(3).toArray();
deepStrictEqual(pairs, [[0, 1], [1, 1], [2, 1]]);
const empty = await infinite().asIndexedPairs().take(0).toArray();
deepStrictEqual(empty, []);
}

{
// AbortSignal
await rejects(async () => {
const ac = new AbortController();
const { signal } = ac;
const p = Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
ac.abort();
await p;
}, { name: 'AbortError' });

await rejects(async () => {
const signal = AbortSignal.abort();
await Readable.from([1, 2, 3]).asIndexedPairs({ signal }).toArray();
}, /AbortError/);
}

0 comments on commit 311050e

Please sign in to comment.