Skip to content

Commit b93a967

Browse files
marco-ippolitotargos
authored andcommitted
doc: add stream/promises pipeline and finished to doc
PR-URL: #45832 Fixes: #45821 Reviewed-By: Paolo Insogna <paolo@cowtech.it> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent 5de08ef commit b93a967

File tree

1 file changed

+227
-107
lines changed

1 file changed

+227
-107
lines changed

doc/api/stream.md

+227-107
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,227 @@ functions for streams that return `Promise` objects rather than using
5959
callbacks. The API is accessible via `require('node:stream/promises')`
6060
or `require('node:stream').promises`.
6161

62+
### `stream.pipeline(source[, ...transforms], destination[, options])`
63+
64+
### `stream.pipeline(streams[, options])`
65+
66+
<!-- YAML
67+
added: v15.0.0
68+
-->
69+
70+
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
71+
* `source` {Stream|Iterable|AsyncIterable|Function}
72+
* Returns: {Promise|AsyncIterable}
73+
* `...transforms` {Stream|Function}
74+
* `source` {AsyncIterable}
75+
* Returns: {Promise|AsyncIterable}
76+
* `destination` {Stream|Function}
77+
* `source` {AsyncIterable}
78+
* Returns: {Promise|AsyncIterable}
79+
* `options` {Object}
80+
* `signal` {AbortSignal}
81+
* `end` {boolean}
82+
* Returns: {Promise} Fulfills when the pipeline is complete.
83+
84+
```cjs
85+
const { pipeline } = require('node:stream/promises');
86+
const fs = require('node:fs');
87+
const zlib = require('node:zlib');
88+
89+
async function run() {
90+
await pipeline(
91+
fs.createReadStream('archive.tar'),
92+
zlib.createGzip(),
93+
fs.createWriteStream('archive.tar.gz'),
94+
);
95+
console.log('Pipeline succeeded.');
96+
}
97+
98+
run().catch(console.error);
99+
```
100+
101+
```mjs
102+
import { pipeline } from 'node:stream/promises';
103+
import { createReadStream, createWriteStream } from 'node:fs';
104+
import { createGzip } from 'node:zlib';
105+
106+
await pipeline(
107+
createReadStream('archive.tar'),
108+
createGzip(),
109+
createWriteStream('archive.tar.gz'),
110+
);
111+
console.log('Pipeline succeeded.');
112+
```
113+
114+
To use an `AbortSignal`, pass it inside an options object, as the last argument.
115+
When the signal is aborted, `destroy` will be called on the underlying pipeline,
116+
with an `AbortError`.
117+
118+
```cjs
119+
const { pipeline } = require('node:stream/promises');
120+
const fs = require('node:fs');
121+
const zlib = require('node:zlib');
122+
123+
async function run() {
124+
const ac = new AbortController();
125+
const signal = ac.signal;
126+
127+
setImmediate(() => ac.abort());
128+
await pipeline(
129+
fs.createReadStream('archive.tar'),
130+
zlib.createGzip(),
131+
fs.createWriteStream('archive.tar.gz'),
132+
{ signal },
133+
);
134+
}
135+
136+
run().catch(console.error); // AbortError
137+
```
138+
139+
```mjs
140+
import { pipeline } from 'node:stream/promises';
141+
import { createReadStream, createWriteStream } from 'node:fs';
142+
import { createGzip } from 'node:zlib';
143+
144+
const ac = new AbortController();
145+
const { signal } = ac;
146+
setImmediate(() => ac.abort());
147+
try {
148+
await pipeline(
149+
createReadStream('archive.tar'),
150+
createGzip(),
151+
createWriteStream('archive.tar.gz'),
152+
{ signal },
153+
);
154+
} catch (err) {
155+
console.error(err); // AbortError
156+
}
157+
```
158+
159+
The `pipeline` API also supports async generators:
160+
161+
```cjs
162+
const { pipeline } = require('node:stream/promises');
163+
const fs = require('node:fs');
164+
165+
async function run() {
166+
await pipeline(
167+
fs.createReadStream('lowercase.txt'),
168+
async function* (source, { signal }) {
169+
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
170+
for await (const chunk of source) {
171+
yield await processChunk(chunk, { signal });
172+
}
173+
},
174+
fs.createWriteStream('uppercase.txt'),
175+
);
176+
console.log('Pipeline succeeded.');
177+
}
178+
179+
run().catch(console.error);
180+
```
181+
182+
```mjs
183+
import { pipeline } from 'node:stream/promises';
184+
import { createReadStream, createWriteStream } from 'node:fs';
185+
186+
await pipeline(
187+
createReadStream('lowercase.txt'),
188+
async function* (source, { signal }) {
189+
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
190+
for await (const chunk of source) {
191+
yield await processChunk(chunk, { signal });
192+
}
193+
},
194+
createWriteStream('uppercase.txt'),
195+
);
196+
console.log('Pipeline succeeded.');
197+
```
198+
199+
Remember to handle the `signal` argument passed into the async generator.
200+
Especially in the case where the async generator is the source for the
201+
pipeline (i.e. first argument) or the pipeline will never complete.
202+
203+
```cjs
204+
const { pipeline } = require('node:stream/promises');
205+
const fs = require('node:fs');
206+
207+
async function run() {
208+
await pipeline(
209+
async function* ({ signal }) {
210+
await someLongRunningfn({ signal });
211+
yield 'asd';
212+
},
213+
fs.createWriteStream('uppercase.txt'),
214+
);
215+
console.log('Pipeline succeeded.');
216+
}
217+
218+
run().catch(console.error);
219+
```
220+
221+
```mjs
222+
import { pipeline } from 'node:stream/promises';
223+
import fs from 'node:fs';
224+
await pipeline(
225+
async function* ({ signal }) {
226+
await someLongRunningfn({ signal });
227+
yield 'asd';
228+
},
229+
fs.createWriteStream('uppercase.txt'),
230+
);
231+
console.log('Pipeline succeeded.');
232+
```
233+
234+
The `pipeline` API provides [callback version][stream-pipeline]:
235+
236+
### `stream.finished(stream[, options])`
237+
238+
<!-- YAML
239+
added: v15.0.0
240+
-->
241+
242+
* `stream` {Stream}
243+
* `options` {Object}
244+
* `error` {boolean|undefined}
245+
* `readable` {boolean|undefined}
246+
* `writable` {boolean|undefined}
247+
* `signal`: {AbortSignal|undefined}
248+
* Returns: {Promise} Fulfills when the stream is no
249+
longer readable or writable.
250+
251+
```cjs
252+
const { finished } = require('node:stream/promises');
253+
const fs = require('node:fs');
254+
255+
const rs = fs.createReadStream('archive.tar');
256+
257+
async function run() {
258+
await finished(rs);
259+
console.log('Stream is done reading.');
260+
}
261+
262+
run().catch(console.error);
263+
rs.resume(); // Drain the stream.
264+
```
265+
266+
```mjs
267+
import { finished } from 'node:stream/promises';
268+
import { createReadStream } from 'node:fs';
269+
270+
const rs = createReadStream('archive.tar');
271+
272+
async function run() {
273+
await finished(rs);
274+
console.log('Stream is done reading.');
275+
}
276+
277+
run().catch(console.error);
278+
rs.resume(); // Drain the stream.
279+
```
280+
281+
The `finished` API provides [callback version][stream-finished]:
282+
62283
### Object mode
63284

64285
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -2447,22 +2668,7 @@ Especially useful in error handling scenarios where a stream is destroyed
24472668
prematurely (like an aborted HTTP request), and will not emit `'end'`
24482669
or `'finish'`.
24492670

2450-
The `finished` API provides promise version:
2451-
2452-
```js
2453-
const { finished } = require('node:stream/promises');
2454-
const fs = require('node:fs');
2455-
2456-
const rs = fs.createReadStream('archive.tar');
2457-
2458-
async function run() {
2459-
await finished(rs);
2460-
console.log('Stream is done reading.');
2461-
}
2462-
2463-
run().catch(console.error);
2464-
rs.resume(); // Drain the stream.
2465-
```
2671+
The `finished` API provides [promise version][stream-finished-promise].
24662672

24672673
`stream.finished()` leaves dangling event listeners (in particular
24682674
`'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been
@@ -2542,97 +2748,7 @@ pipeline(
25422748
);
25432749
```
25442750

2545-
The `pipeline` API provides a promise version, which can also
2546-
receive an options argument as the last parameter with a
2547-
`signal` {AbortSignal} property. When the signal is aborted,
2548-
`destroy` will be called on the underlying pipeline, with an
2549-
`AbortError`.
2550-
2551-
```js
2552-
const { pipeline } = require('node:stream/promises');
2553-
const fs = require('node:fs');
2554-
const zlib = require('node:zlib');
2555-
2556-
async function run() {
2557-
await pipeline(
2558-
fs.createReadStream('archive.tar'),
2559-
zlib.createGzip(),
2560-
fs.createWriteStream('archive.tar.gz'),
2561-
);
2562-
console.log('Pipeline succeeded.');
2563-
}
2564-
2565-
run().catch(console.error);
2566-
```
2567-
2568-
To use an `AbortSignal`, pass it inside an options object,
2569-
as the last argument:
2570-
2571-
```js
2572-
const { pipeline } = require('node:stream/promises');
2573-
const fs = require('node:fs');
2574-
const zlib = require('node:zlib');
2575-
2576-
async function run() {
2577-
const ac = new AbortController();
2578-
const signal = ac.signal;
2579-
2580-
setTimeout(() => ac.abort(), 1);
2581-
await pipeline(
2582-
fs.createReadStream('archive.tar'),
2583-
zlib.createGzip(),
2584-
fs.createWriteStream('archive.tar.gz'),
2585-
{ signal },
2586-
);
2587-
}
2588-
2589-
run().catch(console.error); // AbortError
2590-
```
2591-
2592-
The `pipeline` API also supports async generators:
2593-
2594-
```js
2595-
const { pipeline } = require('node:stream/promises');
2596-
const fs = require('node:fs');
2597-
2598-
async function run() {
2599-
await pipeline(
2600-
fs.createReadStream('lowercase.txt'),
2601-
async function* (source, { signal }) {
2602-
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
2603-
for await (const chunk of source) {
2604-
yield await processChunk(chunk, { signal });
2605-
}
2606-
},
2607-
fs.createWriteStream('uppercase.txt'),
2608-
);
2609-
console.log('Pipeline succeeded.');
2610-
}
2611-
2612-
run().catch(console.error);
2613-
```
2614-
2615-
Remember to handle the `signal` argument passed into the async generator.
2616-
Especially in the case where the async generator is the source for the
2617-
pipeline (i.e. first argument) or the pipeline will never complete.
2618-
2619-
```js
2620-
const { pipeline } = require('node:stream/promises');
2621-
const fs = require('node:fs');
2622-
2623-
async function run() {
2624-
await pipeline(
2625-
async function* ({ signal }) {
2626-
await someLongRunningfn({ signal });
2627-
yield 'asd';
2628-
},
2629-
fs.createWriteStream('uppercase.txt'),
2630-
);
2631-
console.log('Pipeline succeeded.');
2632-
}
2633-
2634-
run().catch(console.error);
2635-
```
2751+
The `pipeline` API provides a [promise version][stream-pipeline-promise].
26362752

26372753
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
26382754

@@ -4566,7 +4682,11 @@ contain multi-byte characters.
45664682
[stream-_write]: #writable_writechunk-encoding-callback
45674683
[stream-_writev]: #writable_writevchunks-callback
45684684
[stream-end]: #writableendchunk-encoding-callback
4685+
[stream-finished]: #streamfinishedstream-options-callback
4686+
[stream-finished-promise]: #streamfinishedstream-options
45694687
[stream-pause]: #readablepause
4688+
[stream-pipeline]: #streampipelinesource-transforms-destination-callback
4689+
[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
45704690
[stream-push]: #readablepushchunk-encoding
45714691
[stream-read]: #readablereadsize
45724692
[stream-resume]: #readableresume

0 commit comments

Comments
 (0)