Skip to content

Commit 5f6372b

Browse files
committed
http2: fix stream reading resumption
_read should always resume the underlying code that is attempting to push data to a readable stream. Adjust http2 core code to resume its reading appropriately. Fixes: nodejs#16578
1 parent 1cdcab0 commit 5f6372b

File tree

5 files changed

+62
-5
lines changed

5 files changed

+62
-5
lines changed

lib/internal/http2/core.js

+1-4
Original file line numberDiff line numberDiff line change
@@ -1276,8 +1276,6 @@ function onStreamClosed(code) {
12761276
}
12771277

12781278
function streamOnResume() {
1279-
if (this._paused)
1280-
return this.pause();
12811279
if (this[kID] === undefined) {
12821280
this.once('ready', streamOnResume);
12831281
return;
@@ -1301,8 +1299,7 @@ function streamOnPause() {
13011299

13021300
function streamOnDrain() {
13031301
const needPause = 0 > this._writableState.highWaterMark;
1304-
if (this._paused && !needPause) {
1305-
this._paused = false;
1302+
if (!needPause) {
13061303
this.resume();
13071304
}
13081305
}

src/node_http2.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ void Http2Session::FlushData(const FunctionCallbackInfo<Value>& args) {
756756
if (!(stream = session->FindStream(id))) {
757757
return args.GetReturnValue().Set(NGHTTP2_ERR_INVALID_STREAM_ID);
758758
}
759-
stream->FlushDataChunks();
759+
stream->ReadResume();
760760
}
761761

762762
void Http2Session::UpdateChunksSent(const FunctionCallbackInfo<Value>& args) {

src/node_http2_core-inl.h

+8
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,14 @@ inline void Nghttp2Stream::ReadStart() {
896896
FlushDataChunks();
897897
}
898898

899+
inline void Nghttp2Stream::ReadResume() {
900+
DEBUG_HTTP2("Nghttp2Stream %d: resume reading\n", id_);
901+
flags_ &= ~NGHTTP2_STREAM_FLAG_READ_PAUSED;
902+
903+
// Flush any queued data chunks immediately out to the JS layer
904+
FlushDataChunks();
905+
}
906+
899907
inline void Nghttp2Stream::ReadStop() {
900908
DEBUG_HTTP2("Nghttp2Stream %d: stop reading\n", id_);
901909
if (!IsReading())

src/node_http2_core.h

+3
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,9 @@ class Nghttp2Stream {
384384
// the session to be emitted at the JS side
385385
inline void ReadStart();
386386

387+
// Resume Reading
388+
inline void ReadResume();
389+
387390
// Stop/Pause Reading.
388391
inline void ReadStop();
389392

test/parallel/test-http2-pipe.js

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
if (!common.hasCrypto)
5+
common.skip('missing crypto');
6+
const fixtures = require('../common/fixtures');
7+
const assert = require('assert');
8+
const http2 = require('http2');
9+
const fs = require('fs');
10+
const path = require('path');
11+
12+
// piping should work as expected with createWriteStream
13+
14+
const loc = fixtures.path('url-tests.js');
15+
const fn = path.join(common.tmpDir, 'http2-url-tests.js');
16+
common.refreshTmpDir();
17+
18+
const server = http2.createServer();
19+
20+
server.on('stream', common.mustCall((stream) => {
21+
const dest = stream.pipe(fs.createWriteStream(fn));
22+
dest.on('finish', common.mustCall(() => {
23+
assert.deepStrictEqual(fs.readFileSync(loc), fs.readFileSync(fn));
24+
fs.unlinkSync(fn);
25+
stream.respond();
26+
stream.end();
27+
}));
28+
}));
29+
30+
server.listen(0, common.mustCall(() => {
31+
const port = server.address().port;
32+
const client = http2.connect(`http://localhost:${port}`);
33+
34+
let remaining = 2;
35+
function maybeClose() {
36+
if (--remaining === 0) {
37+
server.close();
38+
client.destroy();
39+
}
40+
}
41+
42+
const req = client.request({ ':method': 'POST' });
43+
req.on('response', common.mustCall());
44+
req.resume();
45+
req.on('end', common.mustCall(maybeClose));
46+
const str = fs.createReadStream(loc);
47+
str.on('end', common.mustCall(maybeClose));
48+
str.pipe(req);
49+
}));

0 commit comments

Comments
 (0)