Skip to content

Commit 5f7d2b5

Browse files
rluvatondanielleadams
authored andcommitted
stream: add compose operator
PR-URL: #44937 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent ba0e7ae commit 5f7d2b5

File tree

4 files changed

+210
-15
lines changed

4 files changed

+210
-15
lines changed

doc/api/stream.md

+39
Original file line numberDiff line numberDiff line change
@@ -1679,6 +1679,41 @@ option. In the code example above, data will be in a single chunk if the file
16791679
has less then 64 KiB of data because no `highWaterMark` option is provided to
16801680
[`fs.createReadStream()`][].
16811681

1682+
##### `readable.compose(stream[, options])`
1683+
1684+
<!-- YAML
1685+
added: REPLACEME
1686+
-->
1687+
1688+
> Stability: 1 - Experimental
1689+
1690+
* `stream` {Stream|Iterable|AsyncIterable|Function}
1691+
* `options` {Object}
1692+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1693+
aborted.
1694+
* Returns: {Duplex} a stream composed with the stream `stream`.
1695+
1696+
```mjs
1697+
import { Readable } from 'node:stream';
1698+
1699+
async function* splitToWords(source) {
1700+
for await (const chunk of source) {
1701+
const words = String(chunk).split(' ');
1702+
1703+
for (const word of words) {
1704+
yield word;
1705+
}
1706+
}
1707+
}
1708+
1709+
const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
1710+
const words = await wordsStream.toArray();
1711+
1712+
console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
1713+
```
1714+
1715+
See [`stream.compose`][] for more information.
1716+
16821717
##### `readable.iterator([options])`
16831718

16841719
<!-- YAML
@@ -2698,6 +2733,8 @@ await finished(compose(s1, s2, s3));
26982733
console.log(res); // prints 'HELLOWORLD'
26992734
```
27002735

2736+
See [`readable.compose(stream)`][] for `stream.compose` as operator.
2737+
27012738
### `stream.Readable.from(iterable[, options])`
27022739

27032740
<!-- YAML
@@ -4465,11 +4502,13 @@ contain multi-byte characters.
44654502
[`process.stdin`]: process.md#processstdin
44664503
[`process.stdout`]: process.md#processstdout
44674504
[`readable._read()`]: #readable_readsize
4505+
[`readable.compose(stream)`]: #readablecomposestream-options
44684506
[`readable.map`]: #readablemapfn-options
44694507
[`readable.push('')`]: #readablepush
44704508
[`readable.setEncoding()`]: #readablesetencodingencoding
44714509
[`stream.Readable.from()`]: #streamreadablefromiterable-options
44724510
[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
4511+
[`stream.compose`]: #streamcomposestreams
44734512
[`stream.cork()`]: #writablecork
44744513
[`stream.finished()`]: #streamfinishedstream-options-callback
44754514
[`stream.pipe()`]: #readablepipedestination-options

lib/internal/streams/operators.js

+32
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const { AbortController } = require('internal/abort_controller');
44

55
const {
66
codes: {
7+
ERR_INVALID_ARG_VALUE,
78
ERR_INVALID_ARG_TYPE,
89
ERR_MISSING_ARGS,
910
ERR_OUT_OF_RANGE,
@@ -17,6 +18,11 @@ const {
1718
} = require('internal/validators');
1819
const { kWeakHandler } = require('internal/event_target');
1920
const { finished } = require('internal/streams/end-of-stream');
21+
const staticCompose = require('internal/streams/compose');
22+
const {
23+
addAbortSignalNoValidate,
24+
} = require('internal/streams/add-abort-signal');
25+
const { isWritable, isNodeStream } = require('internal/streams/utils');
2026

2127
const {
2228
ArrayPrototypePush,
@@ -32,6 +38,31 @@ const {
3238
const kEmpty = Symbol('kEmpty');
3339
const kEof = Symbol('kEof');
3440

41+
function compose(stream, options) {
42+
if (options != null) {
43+
validateObject(options, 'options');
44+
}
45+
if (options?.signal != null) {
46+
validateAbortSignal(options.signal, 'options.signal');
47+
}
48+
49+
if (isNodeStream(stream) && !isWritable(stream)) {
50+
throw new ERR_INVALID_ARG_VALUE('stream', stream, 'must be writable');
51+
}
52+
53+
const composedStream = staticCompose(this, stream);
54+
55+
if (options?.signal) {
56+
// Not validating as we already validated before
57+
addAbortSignalNoValidate(
58+
options.signal,
59+
composedStream
60+
);
61+
}
62+
63+
return composedStream;
64+
}
65+
3566
function map(fn, options) {
3667
if (typeof fn !== 'function') {
3768
throw new ERR_INVALID_ARG_TYPE(
@@ -392,6 +423,7 @@ module.exports.streamReturningOperators = {
392423
flatMap,
393424
map,
394425
take,
426+
compose,
395427
};
396428

397429
module.exports.promiseReturningOperators = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable, Transform,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
{
10+
// with async generator
11+
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
12+
let str = '';
13+
for await (const chunk of stream) {
14+
str += chunk;
15+
16+
if (str.length === 2) {
17+
yield str;
18+
str = '';
19+
}
20+
}
21+
});
22+
const result = ['ab', 'cd'];
23+
(async () => {
24+
for await (const item of stream) {
25+
assert.strictEqual(item, result.shift());
26+
}
27+
})().then(common.mustCall());
28+
}
29+
30+
{
31+
// With Transformer
32+
const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
33+
objectMode: true,
34+
transform: common.mustCall((chunk, encoding, callback) => {
35+
callback(null, chunk);
36+
}, 4)
37+
}));
38+
const result = ['a', 'b', 'c', 'd'];
39+
(async () => {
40+
for await (const item of stream) {
41+
assert.strictEqual(item, result.shift());
42+
}
43+
})().then(common.mustCall());
44+
}
45+
46+
{
47+
// Throwing an error during `compose` (before waiting for data)
48+
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
49+
50+
throw new Error('boom');
51+
});
52+
53+
assert.rejects(async () => {
54+
for await (const item of stream) {
55+
assert.fail('should not reach here, got ' + item);
56+
}
57+
}, /boom/).then(common.mustCall());
58+
}
59+
60+
{
61+
// Throwing an error during `compose` (when waiting for data)
62+
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
63+
for await (const chunk of stream) {
64+
if (chunk === 3) {
65+
throw new Error('boom');
66+
}
67+
yield chunk;
68+
}
69+
});
70+
71+
assert.rejects(
72+
stream.toArray(),
73+
/boom/,
74+
).then(common.mustCall());
75+
}
76+
77+
{
78+
// Throwing an error during `compose` (after finishing all readable data)
79+
const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
80+
81+
// eslint-disable-next-line no-unused-vars,no-empty
82+
for await (const chunk of stream) {
83+
}
84+
85+
throw new Error('boom');
86+
});
87+
assert.rejects(
88+
stream.toArray(),
89+
/boom/,
90+
).then(common.mustCall());
91+
}
92+
93+
{
94+
// AbortSignal
95+
const ac = new AbortController();
96+
const stream = Readable.from([1, 2, 3, 4, 5])
97+
.compose(async function *(source) {
98+
// Should not reach here
99+
for await (const chunk of source) {
100+
yield chunk;
101+
}
102+
}, { signal: ac.signal });
103+
104+
ac.abort();
105+
106+
assert.rejects(async () => {
107+
for await (const item of stream) {
108+
assert.fail('should not reach here, got ' + item);
109+
}
110+
}, {
111+
name: 'AbortError',
112+
}).then(common.mustCall());
113+
}
114+
115+
{
116+
assert.throws(
117+
() => Readable.from(['a']).compose(Readable.from(['b'])),
118+
{ code: 'ERR_INVALID_ARG_VALUE' }
119+
);
120+
}
121+
122+
{
123+
assert.throws(
124+
() => Readable.from(['a']).compose(),
125+
{ code: 'ERR_INVALID_ARG_TYPE' }
126+
);
127+
}

test/parallel/test-stream-compose.js

+12-15
Original file line numberDiff line numberDiff line change
@@ -358,27 +358,24 @@ const assert = require('assert');
358358
}
359359

360360
{
361-
try {
362-
compose();
363-
} catch (err) {
364-
assert.strictEqual(err.code, 'ERR_MISSING_ARGS');
365-
}
361+
assert.throws(
362+
() => compose(),
363+
{ code: 'ERR_MISSING_ARGS' }
364+
);
366365
}
367366

368367
{
369-
try {
370-
compose(new Writable(), new PassThrough());
371-
} catch (err) {
372-
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
373-
}
368+
assert.throws(
369+
() => compose(new Writable(), new PassThrough()),
370+
{ code: 'ERR_INVALID_ARG_VALUE' }
371+
);
374372
}
375373

376374
{
377-
try {
378-
compose(new PassThrough(), new Readable({ read() {} }), new PassThrough());
379-
} catch (err) {
380-
assert.strictEqual(err.code, 'ERR_INVALID_ARG_VALUE');
381-
}
375+
assert.throws(
376+
() => compose(new PassThrough(), new Readable({ read() {} }), new PassThrough()),
377+
{ code: 'ERR_INVALID_ARG_VALUE' }
378+
);
382379
}
383380

384381
{

0 commit comments

Comments
 (0)