@@ -746,7 +746,8 @@ class Http2Session extends EventEmitter {
746
746
shutdown : false ,
747
747
shuttingDown : false ,
748
748
pendingAck : 0 ,
749
- maxPendingAck : Math . max ( 1 , ( options . maxPendingAck | 0 ) || 10 )
749
+ maxPendingAck : Math . max ( 1 , ( options . maxPendingAck | 0 ) || 10 ) ,
750
+ writeQueueSize : 0
750
751
} ;
751
752
752
753
this [ kType ] = type ;
@@ -1080,6 +1081,22 @@ class Http2Session extends EventEmitter {
1080
1081
}
1081
1082
1082
1083
_onTimeout ( ) {
1084
+ // This checks whether a write is currently in progress and also whether
1085
+ // that write is actually sending data across the write. The kHandle
1086
+ // stored `chunksSentSinceLastWrite` is only updated when a timeout event
1087
+ // happens, meaning that if a write is ongoing it should never equal the
1088
+ // newly fetched, updated value.
1089
+ if ( this [ kState ] . writeQueueSize > 0 ) {
1090
+ const handle = this [ kHandle ] ;
1091
+ const chunksSentSinceLastWrite = handle !== undefined ?
1092
+ handle . chunksSentSinceLastWrite : null ;
1093
+ if ( chunksSentSinceLastWrite !== null &&
1094
+ chunksSentSinceLastWrite !== handle . updateChunksSent ( ) ) {
1095
+ _unrefActive ( this ) ;
1096
+ return ;
1097
+ }
1098
+ }
1099
+
1083
1100
process . nextTick ( emit , this , 'timeout' ) ;
1084
1101
}
1085
1102
}
@@ -1199,8 +1216,27 @@ function createWriteReq(req, handle, data, encoding) {
1199
1216
}
1200
1217
}
1201
1218
1219
+ function trackWriteState ( stream , bytes ) {
1220
+ const session = stream [ kSession ] ;
1221
+ stream [ kState ] . writeQueueSize += bytes ;
1222
+ session [ kState ] . writeQueueSize += bytes ;
1223
+ session [ kHandle ] . chunksSentSinceLastWrite = 0 ;
1224
+ }
1225
+
1202
1226
function afterDoStreamWrite ( status , handle , req ) {
1203
- _unrefActive ( handle [ kOwner ] ) ;
1227
+ const session = handle [ kOwner ] ;
1228
+ _unrefActive ( session ) ;
1229
+
1230
+ const state = session [ kState ] ;
1231
+ const { bytes } = req ;
1232
+ state . writeQueueSize -= bytes ;
1233
+
1234
+ const stream = state . streams . get ( req . stream ) ;
1235
+ if ( stream !== undefined ) {
1236
+ _unrefActive ( stream ) ;
1237
+ stream [ kState ] . writeQueueSize -= bytes ;
1238
+ }
1239
+
1204
1240
if ( typeof req . callback === 'function' )
1205
1241
req . callback ( ) ;
1206
1242
this . handle = undefined ;
@@ -1312,7 +1348,8 @@ class Http2Stream extends Duplex {
1312
1348
headersSent : false ,
1313
1349
headRequest : false ,
1314
1350
aborted : false ,
1315
- closeHandler : onSessionClose . bind ( this )
1351
+ closeHandler : onSessionClose . bind ( this ) ,
1352
+ writeQueueSize : 0
1316
1353
} ;
1317
1354
1318
1355
this . once ( 'ready' , streamOnceReady ) ;
@@ -1359,6 +1396,23 @@ class Http2Stream extends Duplex {
1359
1396
}
1360
1397
1361
1398
_onTimeout ( ) {
1399
+ // This checks whether a write is currently in progress and also whether
1400
+ // that write is actually sending data across the write. The kHandle
1401
+ // stored `chunksSentSinceLastWrite` is only updated when a timeout event
1402
+ // happens, meaning that if a write is ongoing it should never equal the
1403
+ // newly fetched, updated value.
1404
+ if ( this [ kState ] . writeQueueSize > 0 ) {
1405
+ const handle = this [ kSession ] [ kHandle ] ;
1406
+ const chunksSentSinceLastWrite = handle !== undefined ?
1407
+ handle . chunksSentSinceLastWrite : null ;
1408
+ if ( chunksSentSinceLastWrite !== null &&
1409
+ chunksSentSinceLastWrite !== handle . updateChunksSent ( ) ) {
1410
+ _unrefActive ( this ) ;
1411
+ _unrefActive ( this [ kSession ] ) ;
1412
+ return ;
1413
+ }
1414
+ }
1415
+
1362
1416
process . nextTick ( emit , this , 'timeout' ) ;
1363
1417
}
1364
1418
@@ -1396,10 +1450,11 @@ class Http2Stream extends Duplex {
1396
1450
this . once ( 'ready' , this . _write . bind ( this , data , encoding , cb ) ) ;
1397
1451
return ;
1398
1452
}
1399
- _unrefActive ( this ) ;
1400
1453
if ( ! this [ kState ] . headersSent )
1401
1454
this [ kProceed ] ( ) ;
1402
1455
const session = this [ kSession ] ;
1456
+ _unrefActive ( this ) ;
1457
+ _unrefActive ( session ) ;
1403
1458
const handle = session [ kHandle ] ;
1404
1459
const req = new WriteWrap ( ) ;
1405
1460
req . stream = this [ kID ] ;
@@ -1410,18 +1465,19 @@ class Http2Stream extends Duplex {
1410
1465
const err = createWriteReq ( req , handle , data , encoding ) ;
1411
1466
if ( err )
1412
1467
throw util . _errnoException ( err , 'write' , req . error ) ;
1413
- this . _bytesDispatched += req . bytes ;
1468
+ trackWriteState ( this , req . bytes ) ;
1414
1469
}
1415
1470
1416
1471
_writev ( data , cb ) {
1417
1472
if ( this [ kID ] === undefined ) {
1418
1473
this . once ( 'ready' , this . _writev . bind ( this , data , cb ) ) ;
1419
1474
return ;
1420
1475
}
1421
- _unrefActive ( this ) ;
1422
1476
if ( ! this [ kState ] . headersSent )
1423
1477
this [ kProceed ] ( ) ;
1424
1478
const session = this [ kSession ] ;
1479
+ _unrefActive ( this ) ;
1480
+ _unrefActive ( session ) ;
1425
1481
const handle = session [ kHandle ] ;
1426
1482
const req = new WriteWrap ( ) ;
1427
1483
req . stream = this [ kID ] ;
@@ -1438,6 +1494,7 @@ class Http2Stream extends Duplex {
1438
1494
const err = handle . writev ( req , chunks ) ;
1439
1495
if ( err )
1440
1496
throw util . _errnoException ( err , 'write' , req . error ) ;
1497
+ trackWriteState ( this , req . bytes ) ;
1441
1498
}
1442
1499
1443
1500
_read ( nread ) {
@@ -1531,6 +1588,10 @@ class Http2Stream extends Duplex {
1531
1588
return ;
1532
1589
}
1533
1590
1591
+ const state = this [ kState ] ;
1592
+ session [ kState ] . writeQueueSize -= state . writeQueueSize ;
1593
+ state . writeQueueSize = 0 ;
1594
+
1534
1595
const server = session [ kServer ] ;
1535
1596
if ( server !== undefined && err ) {
1536
1597
server . emit ( 'streamError' , err , this ) ;
@@ -1625,7 +1686,12 @@ function processRespondWithFD(fd, headers, offset = 0, length = -1,
1625
1686
if ( ret < 0 ) {
1626
1687
err = new NghttpError ( ret ) ;
1627
1688
process . nextTick ( emit , this , 'error' , err ) ;
1689
+ break ;
1628
1690
}
1691
+ // exact length of the file doesn't matter here, since the
1692
+ // stream is closing anyway — just use 1 to signify that
1693
+ // a write does exist
1694
+ trackWriteState ( this , 1 ) ;
1629
1695
}
1630
1696
}
1631
1697
0 commit comments