Skip to content

Commit 5a2cbac

Browse files
committed
changed net/connect to be non-blocking / asynchronous
1 parent 398833e commit 5a2cbac

File tree

3 files changed

+67
-12
lines changed

3 files changed

+67
-12
lines changed

src/core/ev.c

+44-1
Original file line numberDiff line numberDiff line change
@@ -2456,7 +2456,8 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
24562456
typedef enum {
24572457
JANET_ASYNC_WRITEMODE_WRITE,
24582458
JANET_ASYNC_WRITEMODE_SEND,
2459-
JANET_ASYNC_WRITEMODE_SENDTO
2459+
JANET_ASYNC_WRITEMODE_SENDTO,
2460+
JANET_ASYNC_WRITEMODE_CONNECT
24602461
} JanetWriteMode;
24612462

24622463
typedef struct {
@@ -2509,6 +2510,25 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
25092510
}
25102511
break;
25112512
case JANET_ASYNC_EVENT_USER: {
2513+
#ifdef JANET_NET
2514+
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
2515+
int res = 0;
2516+
int size = sizeof res;
2517+
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
2518+
if (r == 0) {
2519+
if (res == 0) {
2520+
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
2521+
} else {
2522+
// TODO JSOCKCLOSE(s->sock) but is in net.c;
2523+
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
2524+
}
2525+
return JANET_ASYNC_STATUS_DONE;
2526+
} else {
2527+
// TODO panic? assert?
2528+
return JANET_ASYNC_STATUS_NOT_DONE;
2529+
}
2530+
}
2531+
#endif
25122532
/* Begin write */
25132533
int32_t len;
25142534
const uint8_t *bytes;
@@ -2572,6 +2592,25 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
25722592
janet_cancel(s->fiber, janet_cstringv("stream hup"));
25732593
return JANET_ASYNC_STATUS_DONE;
25742594
case JANET_ASYNC_EVENT_WRITE: {
2595+
#ifdef JANET_NET
2596+
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
2597+
int res = 0;
2598+
socklen_t size = sizeof res;
2599+
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
2600+
if (r == 0) {
2601+
if (res == 0) {
2602+
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
2603+
} else {
2604+
// TODO JSOCKCLOSE(s->sock) but is in net.c;
2605+
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
2606+
}
2607+
return JANET_ASYNC_STATUS_DONE;
2608+
} else {
2609+
// TODO panic? assert?
2610+
return JANET_ASYNC_STATUS_NOT_DONE;
2611+
}
2612+
}
2613+
#endif
25752614
int32_t start, len;
25762615
const uint8_t *bytes;
25772616
start = state->start;
@@ -2674,6 +2713,10 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
26742713
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
26752714
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
26762715
}
2716+
2717+
void janet_ev_connect(JanetStream *stream, int flags) {
2718+
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
2719+
}
26772720
#endif
26782721

26792722
/* For a pipe ID */

src/core/net.c

+22-11
Original file line numberDiff line numberDiff line change
@@ -477,32 +477,43 @@ JANET_CORE_FN(cfun_net_connect,
477477
}
478478
}
479479

480+
/* Wrap socket in abstract type JanetStream */
481+
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
482+
483+
/* Set the socket to non-blocking mode */
484+
janet_net_socknoblock(sock);
485+
480486
/* Connect to socket */
481487
#ifdef JANET_WINDOWS
482488
int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
483-
Janet lasterr = janet_ev_lasterr();
489+
int err = WSAGetLastError();
484490
freeaddrinfo(ai);
485491
#else
486492
int status = connect(sock, addr, addrlen);
487-
Janet lasterr = janet_ev_lasterr();
493+
int err = errno;
488494
if (is_unix) {
489495
janet_free(ai);
490496
} else {
491497
freeaddrinfo(ai);
492498
}
493499
#endif
494500

495-
if (status == -1) {
496-
JSOCKCLOSE(sock);
497-
janet_panicf("could not connect socket: %V", lasterr);
501+
if (status != 0) {
502+
#ifdef JANET_WINDOWS
503+
if(err != WSAEWOULDBLOCK) {
504+
#else
505+
if(err != EINPROGRESS) {
506+
#endif
507+
JSOCKCLOSE(sock);
508+
Janet lasterr = janet_ev_lasterr();
509+
janet_panicf("could not connect socket: %V", lasterr);
510+
}
498511
}
512+
513+
/* Handle the connect() result in the event loop*/
514+
janet_ev_connect(stream, MSG_NOSIGNAL);
499515

500-
/* Set up the socket for non-blocking IO after connect - TODO - non-blocking connect? */
501-
janet_net_socknoblock(sock);
502-
503-
/* Wrap socket in abstract type JanetStream */
504-
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
505-
return janet_wrap_abstract(stream);
516+
janet_await();
506517
}
507518

508519
static const char *serverify_socket(JSock sfd) {

src/include/janet.h

+1
Original file line numberDiff line numberDiff line change
@@ -1479,6 +1479,7 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
14791479
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
14801480
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
14811481
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
1482+
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
14821483
#endif
14831484

14841485
/* Write async to a stream */

0 commit comments

Comments
 (0)