Skip to content

Commit 0b2f900

Browse files
mcollinatargos
authored andcommitted
stream: make sure 'readable' is emitted before ending the stream
Fixes: #25810 PR-URL: #26059 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
1 parent 119f83b commit 0b2f900

9 files changed

+191
-33
lines changed

lib/_stream_readable.js

+5-13
Original file line numberDiff line numberDiff line change
@@ -510,20 +510,12 @@ function onEofChunk(stream, state) {
510510
}
511511
}
512512
state.ended = true;
513+
state.needReadable = false;
513514

514-
if (state.sync) {
515-
// If we are sync, wait until next tick to emit the data.
516-
// Otherwise we risk emitting data in the flow()
517-
// the readable code triggers during a read() call
518-
emitReadable(stream);
519-
} else {
520-
// Emit 'readable' now to make sure it gets picked up.
521-
state.needReadable = false;
522-
if (!state.emittedReadable) {
523-
state.emittedReadable = true;
524-
emitReadable_(stream);
525-
}
526-
}
515+
// We are not protecting if emittedReadable = true,
516+
// so 'readable' gets scheduled anyway.
517+
state.emittedReadable = true;
518+
process.nextTick(emitReadable_, stream);
527519
}
528520

529521
// Don't emit readable right away in sync mode, because this can trigger

lib/_stream_transform.js

-3
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,6 @@ function Transform(options) {
116116
writeencoding: null
117117
};
118118

119-
// Start out asking for a readable event once data is transformed.
120-
this._readableState.needReadable = true;
121-
122119
// We have implemented the _read method, and done the other things
123120
// that Readable wants before the first _read call, so unset the
124121
// sync guard flag.

test/parallel/test-http-readable-data-event.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const server = http.createServer((req, res) => {
2929
};
3030

3131
const expectedData = [helloWorld, helloAgainLater];
32-
const expectedRead = [helloWorld, null, helloAgainLater, null];
32+
const expectedRead = [helloWorld, null, helloAgainLater, null, null];
3333

3434
const req = http.request(opts, (res) => {
3535
res.on('error', common.mustNotCall());
@@ -42,7 +42,7 @@ const server = http.createServer((req, res) => {
4242
assert.strictEqual(data, expectedRead.shift());
4343
next();
4444
} while (data !== null);
45-
}, 2));
45+
}, 3));
4646

4747
res.setEncoding('utf8');
4848
res.on('data', common.mustCall((data) => {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const stream = require('stream');
5+
const assert = require('assert');
6+
7+
{
8+
const r = new stream.Readable({
9+
read: common.mustCall(function() {
10+
this.push('content');
11+
this.push(null);
12+
})
13+
});
14+
15+
const t = new stream.Transform({
16+
transform: common.mustCall(function(chunk, encoding, callback) {
17+
this.push(chunk);
18+
return callback();
19+
}),
20+
flush: common.mustCall(function(callback) {
21+
return callback();
22+
})
23+
});
24+
25+
r.pipe(t);
26+
t.on('readable', common.mustCall(function() {
27+
while (true) {
28+
const chunk = t.read();
29+
if (!chunk)
30+
break;
31+
32+
assert.strictEqual(chunk.toString(), 'content');
33+
}
34+
}, 2));
35+
}
36+
37+
{
38+
const t = new stream.Transform({
39+
transform: common.mustCall(function(chunk, encoding, callback) {
40+
this.push(chunk);
41+
return callback();
42+
}),
43+
flush: common.mustCall(function(callback) {
44+
return callback();
45+
})
46+
});
47+
48+
t.end('content');
49+
50+
t.on('readable', common.mustCall(function() {
51+
while (true) {
52+
const chunk = t.read();
53+
if (!chunk)
54+
break;
55+
assert.strictEqual(chunk.toString(), 'content');
56+
}
57+
}, 2));
58+
}
59+
60+
{
61+
const t = new stream.Transform({
62+
transform: common.mustCall(function(chunk, encoding, callback) {
63+
this.push(chunk);
64+
return callback();
65+
}),
66+
flush: common.mustCall(function(callback) {
67+
return callback();
68+
})
69+
});
70+
71+
t.write('content');
72+
t.end();
73+
74+
t.on('readable', common.mustCall(function() {
75+
while (true) {
76+
const chunk = t.read();
77+
if (!chunk)
78+
break;
79+
assert.strictEqual(chunk.toString(), 'content');
80+
}
81+
}, 2));
82+
}
83+
84+
{
85+
const t = new stream.Readable({
86+
read() {
87+
}
88+
});
89+
90+
t.on('readable', common.mustCall(function() {
91+
while (true) {
92+
const chunk = t.read();
93+
if (!chunk)
94+
break;
95+
assert.strictEqual(chunk.toString(), 'content');
96+
}
97+
}, 2));
98+
99+
t.push('content');
100+
t.push(null);
101+
}
102+
103+
{
104+
const t = new stream.Readable({
105+
read() {
106+
}
107+
});
108+
109+
t.on('readable', common.mustCall(function() {
110+
while (true) {
111+
const chunk = t.read();
112+
if (!chunk)
113+
break;
114+
assert.strictEqual(chunk.toString(), 'content');
115+
}
116+
}, 2));
117+
118+
process.nextTick(() => {
119+
t.push('content');
120+
t.push(null);
121+
});
122+
}
123+
124+
{
125+
const t = new stream.Transform({
126+
transform: common.mustCall(function(chunk, encoding, callback) {
127+
this.push(chunk);
128+
return callback();
129+
}),
130+
flush: common.mustCall(function(callback) {
131+
return callback();
132+
})
133+
});
134+
135+
t.on('readable', common.mustCall(function() {
136+
while (true) {
137+
const chunk = t.read();
138+
if (!chunk)
139+
break;
140+
assert.strictEqual(chunk.toString(), 'content');
141+
}
142+
}, 2));
143+
144+
t.write('content');
145+
t.end();
146+
}

test/parallel/test-stream-readable-emittedReadable.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,23 @@ const noRead = new Readable({
4343
read: () => {}
4444
});
4545

46-
noRead.on('readable', common.mustCall(() => {
46+
noRead.once('readable', common.mustCall(() => {
4747
// emittedReadable should be true when the readable event is emitted
4848
assert.strictEqual(noRead._readableState.emittedReadable, true);
4949
noRead.read(0);
5050
// emittedReadable is not reset during read(0)
5151
assert.strictEqual(noRead._readableState.emittedReadable, true);
52+
53+
noRead.on('readable', common.mustCall(() => {
54+
// The second 'readable' is emitted because we are ending
55+
56+
// emittedReadable should be true when the readable event is emitted
57+
assert.strictEqual(noRead._readableState.emittedReadable, false);
58+
noRead.read(0);
59+
// emittedReadable is not reset during read(0)
60+
assert.strictEqual(noRead._readableState.emittedReadable, false);
61+
62+
}));
5263
}));
5364

5465
noRead.push('foo');

test/parallel/test-stream-readable-needReadable.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ readable.on('readable', common.mustCall(() => {
1414
// When the readable event fires, needReadable is reset.
1515
assert.strictEqual(readable._readableState.needReadable, false);
1616
readable.read();
17-
}));
17+
}, 2));
1818

1919
// If a readable listener is attached, then a readable event is needed.
2020
assert.strictEqual(readable._readableState.needReadable, true);
@@ -74,12 +74,14 @@ const slowProducer = new Readable({
7474
});
7575

7676
slowProducer.on('readable', common.mustCall(() => {
77-
if (slowProducer.read(8) === null) {
77+
const chunk = slowProducer.read(8);
78+
const state = slowProducer._readableState;
79+
if (chunk === null) {
7880
// The buffer doesn't have enough data, and the stream is not need,
7981
// we need to notify the reader when data arrives.
80-
assert.strictEqual(slowProducer._readableState.needReadable, true);
82+
assert.strictEqual(state.needReadable, true);
8183
} else {
82-
assert.strictEqual(slowProducer._readableState.needReadable, false);
84+
assert.strictEqual(state.needReadable, false);
8385
}
8486
}, 4));
8587

test/parallel/test-stream-readable-reading-readingMore.js

+6-4
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const Readable = require('stream').Readable;
3131
assert.strictEqual(state.reading, false);
3232
}
3333

34-
const expectedReadingMore = [true, false];
34+
const expectedReadingMore = [true, false, false];
3535
readable.on('readable', common.mustCall(() => {
3636
// There is only one readingMore scheduled from on('data'),
3737
// after which everything is governed by the .read() call
@@ -40,10 +40,12 @@ const Readable = require('stream').Readable;
4040
// If the stream has ended, we shouldn't be reading
4141
assert.strictEqual(state.ended, !state.reading);
4242

43-
const data = readable.read();
44-
if (data === null) // reached end of stream
43+
// consume all the data
44+
while (readable.read() !== null) {}
45+
46+
if (expectedReadingMore.length === 0) // reached end of stream
4547
process.nextTick(common.mustCall(onStreamEnd, 1));
46-
}, 2));
48+
}, 3));
4749

4850
readable.on('end', common.mustCall(onStreamEnd));
4951
readable.push('pushed');

test/parallel/test-stream2-httpclient-response-end.js

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@ const server = http.createServer(function(req, res) {
1111
let data = '';
1212
res.on('readable', common.mustCall(function() {
1313
console.log('readable event');
14-
data += res.read();
15-
}));
14+
let chunk;
15+
while ((chunk = res.read()) !== null) {
16+
data += chunk;
17+
}
18+
}, 2));
1619
res.on('end', common.mustCall(function() {
1720
console.log('end event');
1821
assert.strictEqual(msg, data);

test/parallel/test-stream2-transform.js

+9-4
Original file line numberDiff line numberDiff line change
@@ -321,11 +321,16 @@ const Transform = require('_stream_transform');
321321

322322
pt.end();
323323

324-
assert.strictEqual(emits, 1);
325-
assert.strictEqual(pt.read(5).toString(), 'l');
326-
assert.strictEqual(pt.read(5), null);
324+
// The next readable is emitted on the next tick.
325+
assert.strictEqual(emits, 0);
327326

328-
assert.strictEqual(emits, 1);
327+
process.on('nextTick', function() {
328+
assert.strictEqual(emits, 1);
329+
assert.strictEqual(pt.read(5).toString(), 'l');
330+
assert.strictEqual(pt.read(5), null);
331+
332+
assert.strictEqual(emits, 1);
333+
});
329334
}
330335

331336
{

0 commit comments

Comments
 (0)