Skip to content

Commit cf5f986

Browse files
committedApr 6, 2018
stream: 'readable' have precedence over flowing
In Streams3 the 'readable' event/.read() method had a lower precedence than the `'data'` event that made them impossible to use them together. This make `.resume()` a no-op if there is a listener for the `'readable'` event, making the stream non-flowing if there is a `'data'`  listener. Fixes: #18058 PR-URL: #18994 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 1e07acd commit cf5f986

5 files changed

+269
-49
lines changed
 

‎doc/api/stream.md

+21-1
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,9 @@ changes:
762762
description: >
763763
'readable' is always emitted in the next tick after
764764
.push() is called
765+
- version: REPLACEME
766+
pr-url: https://github.com/nodejs/node/pull/18994
767+
description: Using 'readable' requires calling .read().
765768
-->
766769

767770
The `'readable'` event is emitted when there is data available to be read from
@@ -770,10 +773,16 @@ cause some amount of data to be read into an internal buffer.
770773

771774
```javascript
772775
const readable = getReadableStreamSomehow();
773-
readable.on('readable', () => {
776+
readable.on('readable', function() {
774777
// there is some data to read now
778+
let data;
779+
780+
while (data = this.read()) {
781+
console.log(data);
782+
}
775783
});
776784
```
785+
777786
The `'readable'` event will also be emitted once the end of the stream data
778787
has been reached but before the `'end'` event is emitted.
779788

@@ -806,6 +815,10 @@ In general, the `readable.pipe()` and `'data'` event mechanisms are easier to
806815
understand than the `'readable'` event. However, handling `'readable'` might
807816
result in increased throughput.
808817

818+
If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
819+
takes precedence in controlling the flow, i.e. `'data'` will be emitted
820+
only when [`stream.read()`][stream-read] is called.
821+
809822
##### readable.destroy([error])
810823
<!-- YAML
811824
added: v8.0.0
@@ -997,6 +1010,10 @@ the status of the `highWaterMark`.
9971010
##### readable.resume()
9981011
<!-- YAML
9991012
added: v0.9.4
1013+
changes:
1014+
- version: REPLACEME
1015+
pr-url: https://github.com/nodejs/node/pull/18994
1016+
description: Resume has no effect if there is a 'readable' event listening
10001017
-->
10011018

10021019
* Returns: {this}
@@ -1016,6 +1033,9 @@ getReadableStreamSomehow()
10161033
});
10171034
```
10181035

1036+
The `readable.resume()` method has no effect if there is a `'readable'`
1037+
event listener.
1038+
10191039
##### readable.setEncoding(encoding)
10201040
<!-- YAML
10211041
added: v0.9.4

‎lib/_stream_readable.js

+50-7
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ Readable.prototype.unshift = function(chunk) {
223223
};
224224

225225
function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
226+
debug('readableAddChunk', chunk);
226227
var state = stream._readableState;
227228
if (chunk === null) {
228229
state.reading = false;
@@ -799,20 +800,24 @@ Readable.prototype.unpipe = function(dest) {
799800
// Ensure readable listeners eventually get something
800801
Readable.prototype.on = function(ev, fn) {
801802
const res = Stream.prototype.on.call(this, ev, fn);
803+
const state = this._readableState;
802804

803805
if (ev === 'data') {
804-
// Start flowing on next tick if stream isn't explicitly paused
805-
if (this._readableState.flowing !== false)
806+
// update readableListening so that resume() may be a no-op
807+
// a few lines down. This is needed to support once('readable').
808+
state.readableListening = this.listenerCount('readable') > 0;
809+
810+
// Try start flowing on next tick if stream isn't explicitly paused
811+
if (state.flowing !== false)
806812
this.resume();
807813
} else if (ev === 'readable') {
808-
const state = this._readableState;
809814
if (!state.endEmitted && !state.readableListening) {
810815
state.readableListening = state.needReadable = true;
811816
state.emittedReadable = false;
812-
if (!state.reading) {
813-
process.nextTick(nReadingNextTick, this);
814-
} else if (state.length) {
817+
if (state.length) {
815818
emitReadable(this);
819+
} else if (!state.reading) {
820+
process.nextTick(nReadingNextTick, this);
816821
}
817822
}
818823
}
@@ -821,6 +826,42 @@ Readable.prototype.on = function(ev, fn) {
821826
};
822827
Readable.prototype.addListener = Readable.prototype.on;
823828

829+
Readable.prototype.removeListener = function(ev, fn) {
830+
const res = Stream.prototype.removeListener.call(this, ev, fn);
831+
832+
if (ev === 'readable') {
833+
// We need to check if there is someone still listening to
834+
// to readable and reset the state. However this needs to happen
835+
// after readable has been emitted but before I/O (nextTick) to
836+
// support once('readable', fn) cycles. This means that calling
837+
// resume within the same tick will have no
838+
// effect.
839+
process.nextTick(updateReadableListening, this);
840+
}
841+
842+
return res;
843+
};
844+
845+
Readable.prototype.removeAllListeners = function(ev) {
846+
const res = Stream.prototype.removeAllListeners.call(this, ev);
847+
848+
if (ev === 'readable' || ev === undefined) {
849+
// We need to check if there is someone still listening to
850+
// to readable and reset the state. However this needs to happen
851+
// after readable has been emitted but before I/O (nextTick) to
852+
// support once('readable', fn) cycles. This means that calling
853+
// resume within the same tick will have no
854+
// effect.
855+
process.nextTick(updateReadableListening, this);
856+
}
857+
858+
return res;
859+
};
860+
861+
function updateReadableListening(self) {
862+
self._readableState.readableListening = self.listenerCount('readable') > 0;
863+
}
864+
824865
function nReadingNextTick(self) {
825866
debug('readable nexttick read 0');
826867
self.read(0);
@@ -832,7 +873,9 @@ Readable.prototype.resume = function() {
832873
var state = this._readableState;
833874
if (!state.flowing) {
834875
debug('resume');
835-
state.flowing = true;
876+
// we flow only if there is no one listening
877+
// for readable
878+
state.flowing = !state.readableListening;
836879
resume(this, state);
837880
}
838881
return this;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const http = require('http');
6+
const helloWorld = 'Hello World!';
7+
const helloAgainLater = 'Hello again later!';
8+
9+
const server = http.createServer((req, res) => {
10+
res.writeHead(200, {
11+
'Content-Length': '' + (helloWorld.length + helloAgainLater.length)
12+
});
13+
res.write(helloWorld);
14+
15+
// we need to make sure the data is flushed
16+
setTimeout(() => {
17+
res.end(helloAgainLater);
18+
}, common.platformTimeout(10));
19+
}).listen(0, function() {
20+
const opts = {
21+
hostname: 'localhost',
22+
port: server.address().port,
23+
path: '/'
24+
};
25+
26+
const expectedData = [helloWorld, helloAgainLater];
27+
const expectedRead = [helloWorld, null, helloAgainLater, null];
28+
29+
const req = http.request(opts, (res) => {
30+
res.on('error', common.mustNotCall);
31+
32+
res.on('readable', common.mustCall(() => {
33+
let data;
34+
35+
do {
36+
data = res.read();
37+
assert.strictEqual(data, expectedRead.shift());
38+
} while (data !== null);
39+
}, 2));
40+
41+
res.setEncoding('utf8');
42+
res.on('data', common.mustCall((data) => {
43+
assert.strictEqual(data, expectedData.shift());
44+
}, 2));
45+
46+
res.on('end', common.mustCall(() => {
47+
server.close();
48+
}));
49+
});
50+
51+
req.end();
52+
});

‎test/parallel/test-stream-readable-reading-readingMore.js

+143-41
Original file line numberDiff line numberDiff line change
@@ -3,64 +3,166 @@ const common = require('../common');
33
const assert = require('assert');
44
const Readable = require('stream').Readable;
55

6-
const readable = new Readable({
7-
read(size) {}
8-
});
6+
{
7+
const readable = new Readable({
8+
read(size) {}
9+
});
910

10-
const state = readable._readableState;
11+
const state = readable._readableState;
1112

12-
// Starting off with false initially.
13-
assert.strictEqual(state.reading, false);
14-
assert.strictEqual(state.readingMore, false);
13+
// Starting off with false initially.
14+
assert.strictEqual(state.reading, false);
15+
assert.strictEqual(state.readingMore, false);
1516

16-
readable.on('data', common.mustCall((data) => {
17-
// while in a flowing state, should try to read more.
18-
if (readable.readableFlowing)
17+
readable.on('data', common.mustCall((data) => {
18+
// while in a flowing state with a 'readable' listener
19+
// we should not be reading more
20+
if (readable.readableFlowing)
21+
assert.strictEqual(state.readingMore, true);
22+
23+
// reading as long as we've not ended
24+
assert.strictEqual(state.reading, !state.ended);
25+
}, 2));
26+
27+
function onStreamEnd() {
28+
// End of stream; state.reading is false
29+
// And so should be readingMore.
30+
assert.strictEqual(state.readingMore, false);
31+
assert.strictEqual(state.reading, false);
32+
}
33+
34+
readable.on('readable', common.mustCall(() => {
35+
// 'readable' always gets called before 'end'
36+
// since 'end' hasn't been emitted, more data could be incoming
1937
assert.strictEqual(state.readingMore, true);
2038

21-
// reading as long as we've not ended
22-
assert.strictEqual(state.reading, !state.ended);
23-
}, 2));
39+
// if the stream has ended, we shouldn't be reading
40+
assert.strictEqual(state.ended, !state.reading);
41+
42+
const data = readable.read();
43+
if (data === null) // reached end of stream
44+
process.nextTick(common.mustCall(onStreamEnd, 1));
45+
}, 2));
46+
47+
readable.on('end', common.mustCall(onStreamEnd));
48+
readable.push('pushed');
49+
50+
readable.read(6);
51+
52+
// reading
53+
assert.strictEqual(state.reading, true);
54+
assert.strictEqual(state.readingMore, true);
55+
56+
// add chunk to front
57+
readable.unshift('unshifted');
2458

25-
function onStreamEnd() {
26-
// End of stream; state.reading is false
27-
// And so should be readingMore.
59+
// end
60+
readable.push(null);
61+
}
62+
63+
{
64+
const readable = new Readable({
65+
read(size) {}
66+
});
67+
68+
const state = readable._readableState;
69+
70+
// Starting off with false initially.
71+
assert.strictEqual(state.reading, false);
2872
assert.strictEqual(state.readingMore, false);
73+
74+
readable.on('data', common.mustCall((data) => {
75+
// while in a flowing state without a 'readable' listener
76+
// we should be reading more
77+
if (readable.readableFlowing)
78+
assert.strictEqual(state.readingMore, true);
79+
80+
// reading as long as we've not ended
81+
assert.strictEqual(state.reading, !state.ended);
82+
}, 2));
83+
84+
function onStreamEnd() {
85+
// End of stream; state.reading is false
86+
// And so should be readingMore.
87+
assert.strictEqual(state.readingMore, false);
88+
assert.strictEqual(state.reading, false);
89+
}
90+
91+
readable.on('end', common.mustCall(onStreamEnd));
92+
readable.push('pushed');
93+
94+
// stop emitting 'data' events
95+
assert.strictEqual(state.flowing, true);
96+
readable.pause();
97+
98+
// paused
99+
assert.strictEqual(state.reading, false);
100+
assert.strictEqual(state.flowing, false);
101+
102+
readable.resume();
29103
assert.strictEqual(state.reading, false);
104+
assert.strictEqual(state.flowing, true);
105+
106+
// add chunk to front
107+
readable.unshift('unshifted');
108+
109+
// end
110+
readable.push(null);
30111
}
31112

32-
readable.on('readable', common.mustCall(() => {
33-
// 'readable' always gets called before 'end'
34-
// since 'end' hasn't been emitted, more data could be incoming
35-
assert.strictEqual(state.readingMore, true);
113+
{
114+
const readable = new Readable({
115+
read(size) {}
116+
});
36117

37-
// if the stream has ended, we shouldn't be reading
38-
assert.strictEqual(state.ended, !state.reading);
118+
const state = readable._readableState;
39119

40-
const data = readable.read();
41-
if (data === null) // reached end of stream
42-
process.nextTick(common.mustCall(onStreamEnd, 1));
43-
}, 2));
120+
// Starting off with false initially.
121+
assert.strictEqual(state.reading, false);
122+
assert.strictEqual(state.readingMore, false);
123+
124+
const onReadable = common.mustNotCall;
125+
126+
readable.on('readable', onReadable);
44127

45-
readable.on('end', common.mustCall(onStreamEnd));
128+
readable.on('data', common.mustCall((data) => {
129+
// reading as long as we've not ended
130+
assert.strictEqual(state.reading, !state.ended);
131+
}, 2));
46132

47-
readable.push('pushed');
133+
readable.removeListener('readable', onReadable);
48134

49-
// stop emitting 'data' events
50-
readable.pause();
135+
function onStreamEnd() {
136+
// End of stream; state.reading is false
137+
// And so should be readingMore.
138+
assert.strictEqual(state.readingMore, false);
139+
assert.strictEqual(state.reading, false);
140+
}
51141

52-
// read() should only be called while operating in paused mode
53-
readable.read(6);
142+
readable.on('end', common.mustCall(onStreamEnd));
143+
readable.push('pushed');
54144

55-
// reading
56-
assert.strictEqual(state.reading, true);
57-
assert.strictEqual(state.readingMore, true);
145+
// we are still not flowing, we will be resuming in the next tick
146+
assert.strictEqual(state.flowing, false);
58147

59-
// resume emitting 'data' events
60-
readable.resume();
148+
// wait for nextTick, so the readableListener flag resets
149+
process.nextTick(function() {
150+
readable.resume();
61151

62-
// add chunk to front
63-
readable.unshift('unshifted');
152+
// stop emitting 'data' events
153+
assert.strictEqual(state.flowing, true);
154+
readable.pause();
64155

65-
// end
66-
readable.push(null);
156+
// paused
157+
assert.strictEqual(state.flowing, false);
158+
159+
readable.resume();
160+
assert.strictEqual(state.flowing, true);
161+
162+
// add chunk to front
163+
readable.unshift('unshifted');
164+
165+
// end
166+
readable.push(null);
167+
});
168+
}

‎test/parallel/test-stream3-pause-then-read.js

+3
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ let expectEndingData = expectTotalData;
3535
const r = new Readable({ highWaterMark: 1000 });
3636
let chunks = totalChunks;
3737
r._read = function(n) {
38+
console.log('_read called', chunks);
3839
if (!(chunks % 2))
3940
setImmediate(push);
4041
else if (!(chunks % 3))
@@ -49,6 +50,7 @@ function push() {
4950
if (chunk) {
5051
totalPushed += chunk.length;
5152
}
53+
console.log('chunks', chunks);
5254
r.push(chunk);
5355
}
5456

@@ -64,6 +66,7 @@ function readn(n, then) {
6466
expectEndingData -= n;
6567
(function read() {
6668
const c = r.read(n);
69+
console.error('c', c);
6770
if (!c)
6871
r.once('readable', read);
6972
else {

0 commit comments

Comments
 (0)
Please sign in to comment.