From e9a29f3aae6ae0f95bc8ca209cd280c1ed888b71 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Thu, 31 May 2018 12:11:22 +0200 Subject: [PATCH 1/2] streams: make the pipeline callback mandatory Right now when not adding a callback to the pipeline it could cause an uncaught exception if there is an error. Instead, just make the callback mandatory as mostly done in all other Node.js callback APIs so users explicitly have to decide what to do in such situations. --- lib/internal/streams/pipeline.js | 13 ++++++------- test/parallel/test-stream-pipeline.js | 19 +++++-------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 849b3d39dbe25b..caa4042339bd37 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -6,6 +6,7 @@ let eos; const { + ERR_INVALID_CALLBACK, ERR_MISSING_ARGS, ERR_STREAM_DESTROYED } = require('internal/errors').codes; @@ -19,11 +20,6 @@ function once(callback) { }; } -function noop(err) { - // Rethrow the error if it exists to avoid swallowing it - if (err) throw err; -} - function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; } @@ -66,8 +62,11 @@ function pipe(from, to) { } function popCallback(streams) { - if (!streams.length) return noop; - if (typeof streams[streams.length - 1] !== 'function') return noop; + // Streams should never be an empty array. It should always contain at least + // a single stream. Therefore optimize for the average case instead of + // checking for length === 0 as well. + if (typeof streams[streams.length - 1] !== 'function') + throw new ERR_INVALID_CALLBACK(); return streams.pop(); } diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index b52d60529b0332..db6e0a0cbb8ebb 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -60,7 +60,7 @@ common.crashOnUnhandledRejection(); }, /ERR_MISSING_ARGS/); assert.throws(() => { pipeline(); - }, /ERR_MISSING_ARGS/); + }, /ERR_INVALID_CALLBACK/); } { @@ -499,17 +499,8 @@ common.crashOnUnhandledRejection(); } }); - read.on('close', common.mustCall()); - transform.on('close', common.mustCall()); - write.on('close', common.mustCall()); - - process.on('uncaughtException', common.mustCall((err) => { - assert.deepStrictEqual(err, new Error('kaboom')); - })); - - const dst = pipeline(read, transform, write); - - assert.strictEqual(dst, write); - - read.push('hello'); + assert.throws( + () => pipeline(read, transform, write), + { code: 'ERR_INVALID_CALLBACK' } + ); } From 3bd86d17303d8385922e7ac6bc7fb1a03b63bb07 Mon Sep 17 00:00:00 2001 From: Ruben Bridgewater Date: Thu, 31 May 2018 16:07:32 +0200 Subject: [PATCH 2/2] fixup --- doc/api/stream.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/api/stream.md b/doc/api/stream.md index 483aec1b700fb6..dbe5e22bb39ead 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1340,14 +1340,14 @@ run().catch(console.error); rs.resume(); // drain the stream ``` -### stream.pipeline(...streams[, callback]) +### stream.pipeline(...streams, callback) * `...streams` {Stream} Two or more streams to pipe between. -* `callback` {Function} A callback function that takes an optional error - argument. +* `callback` {Function} Called when the pipeline is fully done. + * `err` {Error} A module method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.