Skip to content

Commit 744a284

Browse files
committedAug 4, 2020
stream: support async for stream impl functions
PR-URL: #34416 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anto Aravinth <anto.aravinth.cse@gmail.com>
1 parent ca26eae commit 744a284

File tree

4 files changed

+453
-5
lines changed

4 files changed

+453
-5
lines changed
 

‎lib/_stream_transform.js

+65-2
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,10 @@ function Transform(options) {
107107
}
108108

109109
function final(cb) {
110+
let called = false;
110111
if (typeof this._flush === 'function' && !this.destroyed) {
111-
this._flush((er, data) => {
112+
const result = this._flush((er, data) => {
113+
called = true;
112114
if (er) {
113115
if (cb) {
114116
cb(er);
@@ -126,6 +128,33 @@ function final(cb) {
126128
cb();
127129
}
128130
});
131+
if (result !== undefined && result !== null) {
132+
try {
133+
const then = result.then;
134+
if (typeof then === 'function') {
135+
then.call(
136+
result,
137+
(data) => {
138+
if (called)
139+
return;
140+
if (data != null)
141+
this.push(data);
142+
this.push(null);
143+
if (cb)
144+
process.nextTick(cb);
145+
},
146+
(err) => {
147+
if (cb) {
148+
process.nextTick(cb, err);
149+
} else {
150+
process.nextTick(() => this.destroy(err));
151+
}
152+
});
153+
}
154+
} catch (err) {
155+
process.nextTick(() => this.destroy(err));
156+
}
157+
}
129158
} else {
130159
this.push(null);
131160
if (cb) {
@@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) {
151180
const wState = this._writableState;
152181
const length = rState.length;
153182

154-
this._transform(chunk, encoding, (err, val) => {
183+
let called = false;
184+
const result = this._transform(chunk, encoding, (err, val) => {
185+
called = true;
155186
if (err) {
156187
callback(err);
157188
return;
@@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) {
172203
this[kCallback] = callback;
173204
}
174205
});
206+
if (result !== undefined && result != null) {
207+
try {
208+
const then = result.then;
209+
if (typeof then === 'function') {
210+
then.call(
211+
result,
212+
(val) => {
213+
if (called)
214+
return;
215+
216+
if (val != null) {
217+
this.push(val);
218+
}
219+
220+
if (
221+
wState.ended ||
222+
length === rState.length ||
223+
rState.length < rState.highWaterMark ||
224+
rState.length === 0) {
225+
process.nextTick(callback);
226+
} else {
227+
this[kCallback] = callback;
228+
}
229+
},
230+
(err) => {
231+
process.nextTick(callback, err);
232+
});
233+
}
234+
} catch (err) {
235+
process.nextTick(callback, err);
236+
}
237+
}
175238
};
176239

177240
Transform.prototype._read = function() {

‎lib/_stream_writable.js

+26-1
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ function needFinish(state) {
647647
function callFinal(stream, state) {
648648
state.sync = true;
649649
state.pendingcb++;
650-
stream._final((err) => {
650+
const result = stream._final((err) => {
651651
state.pendingcb--;
652652
if (err) {
653653
for (const callback of state[kOnFinished].splice(0)) {
@@ -664,6 +664,31 @@ function callFinal(stream, state) {
664664
process.nextTick(finish, stream, state);
665665
}
666666
});
667+
if (result !== undefined && result !== null) {
668+
try {
669+
const then = result.then;
670+
if (typeof then === 'function') {
671+
then.call(
672+
result,
673+
function() {
674+
if (state.prefinished)
675+
return;
676+
state.prefinish = true;
677+
process.nextTick(() => stream.emit('prefinish'));
678+
state.pendingcb++;
679+
process.nextTick(finish, stream, state);
680+
},
681+
function(err) {
682+
for (const callback of state[kOnFinished].splice(0)) {
683+
process.nextTick(callback, err);
684+
}
685+
process.nextTick(errorOrDestroy, stream, err, state.sync);
686+
});
687+
}
688+
} catch (err) {
689+
process.nextTick(errorOrDestroy, stream, err, state.sync);
690+
}
691+
}
667692
state.sync = false;
668693
}
669694

‎lib/internal/streams/destroy.js

+104-2
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,13 @@ function destroy(err, cb) {
5959
}
6060

6161
function _destroy(self, err, cb) {
62-
self._destroy(err || null, (err) => {
62+
let called = false;
63+
const result = self._destroy(err || null, (err) => {
6364
const r = self._readableState;
6465
const w = self._writableState;
6566

67+
called = true;
68+
6669
if (err) {
6770
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
6871
err.stack;
@@ -92,6 +95,64 @@ function _destroy(self, err, cb) {
9295
process.nextTick(emitCloseNT, self);
9396
}
9497
});
98+
if (result !== undefined && result !== null) {
99+
try {
100+
const then = result.then;
101+
if (typeof then === 'function') {
102+
then.call(
103+
result,
104+
function() {
105+
if (called)
106+
return;
107+
108+
const r = self._readableState;
109+
const w = self._writableState;
110+
111+
if (w) {
112+
w.closed = true;
113+
}
114+
if (r) {
115+
r.closed = true;
116+
}
117+
118+
if (typeof cb === 'function') {
119+
process.nextTick(cb);
120+
}
121+
122+
process.nextTick(emitCloseNT, self);
123+
},
124+
function(err) {
125+
const r = self._readableState;
126+
const w = self._writableState;
127+
err.stack;
128+
129+
called = true;
130+
131+
if (w && !w.errored) {
132+
w.errored = err;
133+
}
134+
if (r && !r.errored) {
135+
r.errored = err;
136+
}
137+
138+
if (w) {
139+
w.closed = true;
140+
}
141+
if (r) {
142+
r.closed = true;
143+
}
144+
145+
if (typeof cb === 'function') {
146+
process.nextTick(cb, err);
147+
}
148+
149+
process.nextTick(emitErrorCloseNT, self, err);
150+
});
151+
}
152+
} catch (err) {
153+
process.nextTick(emitErrorNT, self, err);
154+
}
155+
}
95156
}
96157

97158
function emitErrorCloseNT(self, err) {
@@ -230,7 +291,7 @@ function constructNT(stream) {
230291
const s = w || r;
231292

232293
let called = false;
233-
stream._construct((err) => {
294+
const result = stream._construct((err) => {
234295
if (r) {
235296
r.constructed = true;
236297
}
@@ -252,6 +313,47 @@ function constructNT(stream) {
252313
process.nextTick(emitConstructNT, stream);
253314
}
254315
});
316+
if (result !== undefined && result !== null) {
317+
try {
318+
const then = result.then;
319+
if (typeof then === 'function') {
320+
then.call(
321+
result,
322+
function() {
323+
// If the callback was invoked, do nothing further.
324+
if (called)
325+
return;
326+
if (r) {
327+
r.constructed = true;
328+
}
329+
if (w) {
330+
w.constructed = true;
331+
}
332+
if (s.destroyed) {
333+
process.nextTick(() => stream.emit(kDestroy));
334+
} else {
335+
process.nextTick(emitConstructNT, stream);
336+
}
337+
},
338+
function(err) {
339+
if (r) {
340+
r.constructed = true;
341+
}
342+
if (w) {
343+
w.constructed = true;
344+
}
345+
called = true;
346+
if (s.destroyed) {
347+
process.nextTick(() => stream.emit(kDestroy, err));
348+
} else {
349+
process.nextTick(errorOrDestroy, stream, err);
350+
}
351+
});
352+
}
353+
} catch (err) {
354+
process.nextTick(emitErrorNT, stream, err);
355+
}
356+
}
255357
}
256358

257359
function emitConstructNT(stream) {

0 commit comments

Comments
 (0)