Skip to content

Commit 7c4b09b

Browse files
addaleaxBridgeAR
authored andcommitted
src: refactor stream callbacks and ownership
Instead of setting individual callbacks on streams and tracking stream ownership through a boolean `consume_` flag, always have one specific listener object in charge of a stream, and call methods on that object rather than generic C-style callbacks. PR-URL: #18334 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 1b6cb94 commit 7c4b09b

18 files changed

+463
-474
lines changed

lib/_http_server.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ function onSocketPause() {
666666
function unconsume(parser, socket) {
667667
if (socket._handle) {
668668
if (parser._consumed)
669-
parser.unconsume(socket._handle._externalStream);
669+
parser.unconsume();
670670
parser._consumed = false;
671671
socket.removeListener('pause', onSocketPause);
672672
socket.removeListener('resume', onSocketResume);

src/connection_wrap.cc

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "connect_wrap.h"
44
#include "env-inl.h"
55
#include "pipe_wrap.h"
6+
#include "stream_base-inl.h"
67
#include "stream_wrap.h"
78
#include "tcp_wrap.h"
89
#include "util-inl.h"

src/js_stream.cc

+7-48
Original file line numberDiff line numberDiff line change
@@ -25,55 +25,13 @@ JSStream::JSStream(Environment* env, Local<Object> obj)
2525
StreamBase(env) {
2626
node::Wrap(obj, this);
2727
MakeWeak<JSStream>(this);
28-
29-
set_alloc_cb({ OnAllocImpl, this });
30-
set_read_cb({ OnReadImpl, this });
3128
}
3229

3330

3431
JSStream::~JSStream() {
3532
}
3633

3734

38-
void JSStream::OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx) {
39-
buf->base = Malloc(size);
40-
buf->len = size;
41-
}
42-
43-
44-
void JSStream::OnReadImpl(ssize_t nread,
45-
const uv_buf_t* buf,
46-
uv_handle_type pending,
47-
void* ctx) {
48-
JSStream* wrap = static_cast<JSStream*>(ctx);
49-
CHECK_NE(wrap, nullptr);
50-
Environment* env = wrap->env();
51-
HandleScope handle_scope(env->isolate());
52-
Context::Scope context_scope(env->context());
53-
54-
if (nread < 0) {
55-
if (buf != nullptr && buf->base != nullptr)
56-
free(buf->base);
57-
wrap->EmitData(nread, Local<Object>(), Local<Object>());
58-
return;
59-
}
60-
61-
if (nread == 0) {
62-
if (buf->base != nullptr)
63-
free(buf->base);
64-
return;
65-
}
66-
67-
CHECK_LE(static_cast<size_t>(nread), buf->len);
68-
char* base = node::Realloc(buf->base, nread);
69-
70-
CHECK_EQ(pending, UV_UNKNOWN_HANDLE);
71-
72-
Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
73-
wrap->EmitData(nread, obj, Local<Object>());
74-
}
75-
76-
7735
AsyncWrap* JSStream::GetAsyncWrap() {
7836
return static_cast<AsyncWrap*>(this);
7937
}
@@ -212,26 +170,27 @@ void JSStream::ReadBuffer(const FunctionCallbackInfo<Value>& args) {
212170
char* data = Buffer::Data(args[0]);
213171
int len = Buffer::Length(args[0]);
214172

215-
do {
216-
uv_buf_t buf;
173+
// Repeatedly ask the stream's owner for memory, copy the data that we
174+
// just read from JS into those buffers and emit them as reads.
175+
while (len != 0) {
176+
uv_buf_t buf = wrap->EmitAlloc(len);
217177
ssize_t avail = len;
218-
wrap->EmitAlloc(len, &buf);
219178
if (static_cast<ssize_t>(buf.len) < avail)
220179
avail = buf.len;
221180

222181
memcpy(buf.base, data, avail);
223182
data += avail;
224183
len -= avail;
225-
wrap->EmitRead(avail, &buf);
226-
} while (len != 0);
184+
wrap->EmitRead(avail, buf);
185+
}
227186
}
228187

229188

230189
void JSStream::EmitEOF(const FunctionCallbackInfo<Value>& args) {
231190
JSStream* wrap;
232191
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
233192

234-
wrap->EmitRead(UV_EOF, nullptr);
193+
wrap->EmitRead(UV_EOF);
235194
}
236195

237196

0 commit comments

Comments
 (0)