forked from nodejs/node
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathnode_quic_stream.cc
513 lines (449 loc) · 17.2 KB
/
node_quic_stream.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
#include "node_quic_stream-inl.h" // NOLINT(build/include)
#include "async_wrap-inl.h"
#include "debug_utils-inl.h"
#include "env-inl.h"
#include "node.h"
#include "node_buffer.h"
#include "node_internals.h"
#include "stream_base-inl.h"
#include "node_sockaddr-inl.h"
#include "node_http_common-inl.h"
#include "node_quic_session-inl.h"
#include "node_quic_socket-inl.h"
#include "node_quic_util-inl.h"
#include "v8.h"
#include "uv.h"
#include <array>
#include <algorithm>
#include <limits>
#include <string>
namespace node {
using v8::Array;
using v8::Context;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::Isolate;
using v8::Local;
using v8::Object;
using v8::ObjectTemplate;
using v8::String;
using v8::Value;
namespace quic {
QuicStream::QuicStream(
QuicSession* sess,
Local<Object> wrap,
int64_t stream_id,
int64_t push_id)
: AsyncWrap(sess->env(), wrap, AsyncWrap::PROVIDER_QUICSTREAM),
StreamBase(sess->env()),
StatsBase(sess->env(), wrap,
HistogramOptions::ACK |
HistogramOptions::RATE |
HistogramOptions::SIZE),
session_(sess),
stream_id_(stream_id),
push_id_(push_id),
quic_state_(sess->quic_state()) {
CHECK_NOT_NULL(sess);
Debug(this, "Created");
StreamBase::AttachToObject(GetObject());
ngtcp2_transport_params params;
ngtcp2_conn_get_local_transport_params(session()->connection(), ¶ms);
IncrementStat(&QuicStreamStats::max_offset, params.initial_max_data);
}
QuicStream::~QuicStream() {
DebugStats();
}
template <typename Fn>
void QuicStreamStatsTraits::ToString(const QuicStream& ptr, Fn&& add_field) {
#define V(_n, name, label) \
add_field(label, ptr.GetStat(&QuicStreamStats::name));
STREAM_STATS(V)
#undef V
}
// Acknowledge is called when ngtcp2 has received an acknowledgement
// for one or more stream frames for this QuicStream. This will cause
// data stored in the streambuf_ outbound queue to be consumed and may
// result in the JavaScript callback for the write to be invoked.
void QuicStream::Acknowledge(uint64_t offset, size_t datalen) {
if (is_destroyed())
return;
// ngtcp2 guarantees that offset must always be greater
// than the previously received offset, but let's just
// make sure that holds.
CHECK_GE(offset, GetStat(&QuicStreamStats::max_offset_ack));
SetStat(&QuicStreamStats::max_offset_ack, offset);
Debug(this, "Acknowledging %d bytes", datalen);
// Consumes the given number of bytes in the buffer. This may
// have the side-effect of causing the onwrite callback to be
// invoked if a complete chunk of buffered data has been acknowledged.
streambuf_.Consume(datalen);
RecordAck(&QuicStreamStats::acked_at);
}
// While not all QUIC applications will support headers, QuicStream
// includes basic, generic support for storing them.
bool QuicStream::AddHeader(std::unique_ptr<QuicHeader> header) {
size_t len = header->length();
QuicApplication* app = session()->application();
// We cannot add the header if we've either reached
// * the max number of header pairs or
// * the max number of header bytes
if (headers_.size() == app->max_header_pairs() ||
current_headers_length_ + len > app->max_header_length()) {
return false;
}
current_headers_length_ += header->length();
Debug(this, "Header - %s", header.get());
headers_.emplace_back(std::move(header));
return true;
}
std::string QuicStream::diagnostic_name() const {
return std::string("QuicStream ") + std::to_string(stream_id_) +
" (" + std::to_string(static_cast<int64_t>(get_async_id())) +
", " + session_->diagnostic_name() + ")";
}
void QuicStream::Destroy() {
if (is_destroyed())
return;
set_flag(QUICSTREAM_FLAG_DESTROYED);
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
streambuf_.End();
// If there is data currently buffered in the streambuf_,
// then cancel will call out to invoke an arbitrary
// JavaScript callback (the on write callback). Within
// that callback, however, the QuicStream will no longer
// be usable to send or receive data.
streambuf_.Cancel();
CHECK_EQ(streambuf_.length(), 0);
// The QuicSession maintains a map of std::unique_ptrs to
// QuicStream instances. Removing this here will cause
// this QuicStream object to be deconstructed, so the
// QuicStream object will no longer exist after this point.
session_->RemoveStream(stream_id_);
}
// Do shutdown is called when the JS stream writable side is closed.
// If we're not within an ngtcp2 callback, this will trigger the
// QuicSession to send any pending data. Any time after this is
// called, a final stream frame will be sent for this QuicStream,
// but it may not be sent right away.
int QuicStream::DoShutdown(ShutdownWrap* req_wrap) {
if (is_destroyed())
return UV_EPIPE;
QuicSession::SendSessionScope send_scope(session(), true);
if (is_writable()) {
Debug(this, "Shutdown writable side");
RecordTimestamp(&QuicStreamStats::closing_at);
streambuf_.End();
session()->ResumeStream(stream_id_);
}
return 1;
}
int QuicStream::DoWrite(
WriteWrap* req_wrap,
uv_buf_t* bufs,
size_t nbufs,
uv_stream_t* send_handle) {
CHECK_NULL(send_handle);
// A write should not have happened if we've been destroyed or
// the QuicStream is no longer (or was never) writable.
if (is_destroyed() || !is_writable()) {
req_wrap->Done(UV_EPIPE);
return 0;
}
// Nothing to write.
size_t length = get_length(bufs, nbufs);
if (length == 0) {
req_wrap->Done(0);
return 0;
}
QuicSession::SendSessionScope send_scope(session(), true);
Debug(this, "Queuing %" PRIu64 " bytes of data from %d buffers",
length, nbufs);
IncrementStat(&QuicStreamStats::bytes_sent, static_cast<uint64_t>(length));
BaseObjectPtr<AsyncWrap> strong_ref{req_wrap->GetAsyncWrap()};
// The list of buffers will be appended onto streambuf_ without
// copying. Those will remain in the buffer until the serialized
// stream frames are acknowledged.
// This callback function will be invoked once this
// complete batch of buffers has been acknowledged
// by the peer. This will have the side effect of
// blocking additional pending writes from the
// javascript side, so writing data to the stream
// will be throttled by how quickly the peer is
// able to acknowledge stream packets. This is good
// in the sense of providing back-pressure, but
// also means that writes will be significantly
// less performant unless written in batches.
streambuf_.Push(
bufs,
nbufs,
[req_wrap, strong_ref](int status) {
req_wrap->Done(status);
});
session()->ResumeStream(stream_id_);
return 0;
}
bool QuicStream::IsAlive() {
return !is_destroyed() && !IsClosing();
}
bool QuicStream::IsClosing() {
return !is_writable() && !is_readable();
}
int QuicStream::ReadStart() {
CHECK(!is_destroyed());
CHECK(is_readable());
set_flag(QUICSTREAM_FLAG_READ_STARTED);
set_flag(QUICSTREAM_FLAG_READ_PAUSED, false);
IncrementStat(
&QuicStreamStats::max_offset,
inbound_consumed_data_while_paused_);
session_->ExtendStreamOffset(id(), inbound_consumed_data_while_paused_);
return 0;
}
int QuicStream::ReadStop() {
CHECK(!is_destroyed());
CHECK(is_readable());
set_flag(QUICSTREAM_FLAG_READ_PAUSED);
return 0;
}
void QuicStream::IncrementStats(size_t datalen) {
uint64_t len = static_cast<uint64_t>(datalen);
IncrementStat(&QuicStreamStats::bytes_received, len);
RecordRate(&QuicStreamStats::received_at);
RecordSize(len);
}
void QuicStream::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("buffer", &streambuf_);
StatsBase::StatsMemoryInfo(tracker);
tracker->TrackField("headers", headers_);
}
BaseObjectPtr<QuicStream> QuicStream::New(
QuicSession* session,
int64_t stream_id,
int64_t push_id) {
Local<Object> obj;
if (!session->env()
->quicserverstream_instance_template()
->NewInstance(session->env()->context()).ToLocal(&obj)) {
return {};
}
BaseObjectPtr<QuicStream> stream =
MakeDetachedBaseObject<QuicStream>(
session,
obj,
stream_id,
push_id);
CHECK(stream);
session->AddStream(stream);
return stream;
}
// Passes chunks of data on to the JavaScript side as soon as they are
// received but only if we're still readable. The caller of this must have a
// HandleScope.
//
// Note that this is pushing data to the JS side regardless of whether
// anything is listening. For flow-control, we only send window updates
// to the sending peer if the stream is in flowing mode, so the sender
// should not be sending too much data.
void QuicStream::ReceiveData(
int fin,
const uint8_t* data,
size_t datalen,
uint64_t offset) {
CHECK(!is_destroyed());
Debug(this, "Receiving %d bytes. Final? %s. Readable? %s",
datalen,
fin ? "yes" : "no",
is_readable() ? "yes" : "no");
// If the QuicStream is not (or was never) readable, just ignore the chunk.
if (!is_readable())
return;
// ngtcp2 guarantees that datalen will only be 0 if fin is set.
// Let's just make sure.
CHECK(datalen > 0 || fin == 1);
// ngtcp2 guarantees that offset is always greater than the previously
// received offset. Let's just make sure.
CHECK_GE(offset, GetStat(&QuicStreamStats::max_offset_received));
SetStat(&QuicStreamStats::max_offset_received, offset);
if (datalen > 0) {
// IncrementStats will update the data_rx_rate_ and data_rx_size_
// histograms. These will provide data necessary to detect and
// prevent Slow Send DOS attacks specifically by allowing us to
// see if a connection is sending very small chunks of data at very
// slow speeds. It is important to emphasize, however, that slow send
// rates may be perfectly legitimate so we cannot simply take blanket
// action when slow rates are detected. Nor can we reliably define what
// a slow rate even is! Will will need to determine some reasonable
// default and allow user code to change the default as well as determine
// what action to take. The current strategy will be to trigger an event
// on the stream when data transfer rates are likely to be considered too
// slow.
IncrementStats(datalen);
while (datalen > 0) {
uv_buf_t buf = EmitAlloc(datalen);
size_t avail = std::min(static_cast<size_t>(buf.len), datalen);
// For now, we're allocating and copying. Once we determine if we can
// safely switch to a non-allocated mode like we do with http2 streams,
// we can make this branch more efficient by using the LIKELY
// optimization. The way ngtcp2 currently works, however, we have
// to memcpy here.
if (UNLIKELY(buf.base == nullptr))
buf.base = reinterpret_cast<char*>(const_cast<uint8_t*>(data));
else
memcpy(buf.base, data, avail);
data += avail;
datalen -= avail;
// Capture read_paused before EmitRead in case user code callbacks
// alter the state when EmitRead is called.
bool read_paused = is_flag_set(QUICSTREAM_FLAG_READ_PAUSED);
EmitRead(avail, buf);
// Reading can be paused while we are processing. If that's
// the case, we still want to acknowledge the current bytes
// so that pausing does not throw off our flow control.
if (read_paused) {
inbound_consumed_data_while_paused_ += avail;
} else {
IncrementStat(&QuicStreamStats::max_offset, avail);
session_->ExtendStreamOffset(id(), avail);
}
}
}
// When fin != 0, we've received that last chunk of data for this
// stream, indicating that the stream will no longer be readable.
if (fin) {
set_flag(QUICSTREAM_FLAG_FIN);
set_final_size(offset + datalen);
EmitRead(UV_EOF);
}
}
int QuicStream::DoPull(
bob::Next<ngtcp2_vec> next,
int options,
ngtcp2_vec* data,
size_t count,
size_t max_count_hint) {
return streambuf_.Pull(
std::move(next),
options,
data,
count,
max_count_hint);
}
// JavaScript API
namespace {
void QuicStreamGetID(const FunctionCallbackInfo<Value>& args) {
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
args.GetReturnValue().Set(static_cast<double>(stream->id()));
}
void OpenUnidirectionalStream(const FunctionCallbackInfo<Value>& args) {
CHECK(!args.IsConstructCall());
CHECK(args[0]->IsObject());
QuicSession* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args[0].As<Object>());
int64_t stream_id;
if (!session->OpenUnidirectionalStream(&stream_id))
return;
BaseObjectPtr<QuicStream> stream = QuicStream::New(session, stream_id);
args.GetReturnValue().Set(stream->object());
}
void OpenBidirectionalStream(const FunctionCallbackInfo<Value>& args) {
CHECK(!args.IsConstructCall());
CHECK(args[0]->IsObject());
QuicSession* session;
ASSIGN_OR_RETURN_UNWRAP(&session, args[0].As<Object>());
int64_t stream_id;
if (!session->OpenBidirectionalStream(&stream_id))
return;
BaseObjectPtr<QuicStream> stream = QuicStream::New(session, stream_id);
args.GetReturnValue().Set(stream->object());
}
void QuicStreamDestroy(const FunctionCallbackInfo<Value>& args) {
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
stream->Destroy();
}
void QuicStreamReset(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
QuicError error(env, args[0], args[1], QUIC_ERROR_APPLICATION);
stream->ResetStream(
error.family == QUIC_ERROR_APPLICATION ?
error.code : static_cast<uint64_t>(NGTCP2_NO_ERROR));
}
// Requests transmission of a block of informational headers. Not all
// QUIC Applications will support headers. If headers are not supported,
// This will set the return value to false, otherwise the return value
// is set to true
void QuicStreamSubmitInformation(const FunctionCallbackInfo<Value>& args) {
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
CHECK(args[0]->IsArray());
args.GetReturnValue().Set(stream->SubmitInformation(args[0].As<Array>()));
}
// Requests transmission of a block of initial headers. Not all
// QUIC Applications will support headers. If headers are not supported,
// this will set the return value to false, otherwise the return value
// is set to true. For http/3, these may be request or response headers.
void QuicStreamSubmitHeaders(const FunctionCallbackInfo<Value>& args) {
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
CHECK(args[0]->IsArray());
uint32_t flags = QUICSTREAM_HEADER_FLAGS_NONE;
CHECK(args[1]->Uint32Value(stream->env()->context()).To(&flags));
args.GetReturnValue().Set(stream->SubmitHeaders(args[0].As<Array>(), flags));
}
// Requests transmission of a block of trailing headers. Not all
// QUIC Applications will support headers. If headers are not supported,
// this will set the return value to false, otherwise the return value
// is set to true.
void QuicStreamSubmitTrailers(const FunctionCallbackInfo<Value>& args) {
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
CHECK(args[0]->IsArray());
args.GetReturnValue().Set(stream->SubmitTrailers(args[0].As<Array>()));
}
// Requests creation of a push stream. Not all QUIC Applications will
// support push streams. If pushes are not supported, the return value
// will be undefined, otherwise the return value will be the created
// QuicStream representing the push.
void QuicStreamSubmitPush(const FunctionCallbackInfo<Value>& args) {
QuicStream* stream;
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
CHECK(args[0]->IsArray());
BaseObjectPtr<QuicStream> push_stream =
stream->SubmitPush(args[0].As<Array>());
if (push_stream)
args.GetReturnValue().Set(push_stream->object());
}
} // namespace
void QuicStream::Initialize(
Environment* env,
Local<Object> target,
Local<Context> context) {
Isolate* isolate = env->isolate();
Local<String> class_name = FIXED_ONE_BYTE_STRING(isolate, "QuicStream");
Local<FunctionTemplate> stream = FunctionTemplate::New(env->isolate());
stream->SetClassName(class_name);
stream->Inherit(AsyncWrap::GetConstructorTemplate(env));
StreamBase::AddMethods(env, stream);
Local<ObjectTemplate> streamt = stream->InstanceTemplate();
streamt->SetInternalFieldCount(StreamBase::kInternalFieldCount);
streamt->Set(env->owner_symbol(), Null(env->isolate()));
env->SetProtoMethod(stream, "destroy", QuicStreamDestroy);
env->SetProtoMethod(stream, "resetStream", QuicStreamReset);
env->SetProtoMethod(stream, "id", QuicStreamGetID);
env->SetProtoMethod(stream, "submitInformation", QuicStreamSubmitInformation);
env->SetProtoMethod(stream, "submitHeaders", QuicStreamSubmitHeaders);
env->SetProtoMethod(stream, "submitTrailers", QuicStreamSubmitTrailers);
env->SetProtoMethod(stream, "submitPush", QuicStreamSubmitPush);
env->set_quicserverstream_instance_template(streamt);
target->Set(env->context(),
class_name,
stream->GetFunction(env->context()).ToLocalChecked()).Check();
env->SetMethod(target, "openBidirectionalStream", OpenBidirectionalStream);
env->SetMethod(target, "openUnidirectionalStream", OpenUnidirectionalStream);
}
} // namespace quic
} // namespace node