Skip to content

Commit b621d4d

Browse files
authored
Merge pull request #1139 from zevv/async-connect
changed net/connect to be non-blocking / asynchronous
2 parents f2197fa + 89debac commit b621d4d

File tree

3 files changed

+81
-11
lines changed

3 files changed

+81
-11
lines changed

src/core/ev.c

+58-1
Original file line numberDiff line numberDiff line change
@@ -1502,6 +1502,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp to) {
15021502
state = state->_next;
15031503
}
15041504
}
1505+
/* Close the stream if requested and no more listeners are left */
1506+
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
1507+
janet_stream_close(stream);
1508+
}
15051509
}
15061510
}
15071511
}
@@ -1656,6 +1660,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
16561660
janet_unlisten(state, 0);
16571661
state = next_state;
16581662
}
1663+
/* Close the stream if requested and no more listeners are left */
1664+
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
1665+
janet_stream_close(stream);
1666+
}
16591667
}
16601668
}
16611669
}
@@ -1854,6 +1862,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
18541862

18551863
state = next_state;
18561864
}
1865+
/* Close the stream if requested and no more listeners are left */
1866+
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
1867+
janet_stream_close(stream);
1868+
}
18571869
}
18581870
}
18591871
}
@@ -1957,6 +1969,7 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
19571969
JanetAsyncStatus status3 = JANET_ASYNC_STATUS_NOT_DONE;
19581970
JanetAsyncStatus status4 = JANET_ASYNC_STATUS_NOT_DONE;
19591971
state->event = pfd;
1972+
JanetStream *stream = state->stream;
19601973
if (mask & POLLOUT)
19611974
status1 = state->machine(state, JANET_ASYNC_EVENT_WRITE);
19621975
if (mask & POLLIN)
@@ -1970,6 +1983,10 @@ void janet_loop1_impl(int has_timeout, JanetTimestamp timeout) {
19701983
status3 == JANET_ASYNC_STATUS_DONE ||
19711984
status4 == JANET_ASYNC_STATUS_DONE)
19721985
janet_unlisten(state, 0);
1986+
/* Close the stream if requested and no more listeners are left */
1987+
if ((stream->flags & JANET_STREAM_TOCLOSE) && !stream->state) {
1988+
janet_stream_close(stream);
1989+
}
19731990
}
19741991
}
19751992

@@ -2456,7 +2473,8 @@ void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, in
24562473
typedef enum {
24572474
JANET_ASYNC_WRITEMODE_WRITE,
24582475
JANET_ASYNC_WRITEMODE_SEND,
2459-
JANET_ASYNC_WRITEMODE_SENDTO
2476+
JANET_ASYNC_WRITEMODE_SENDTO,
2477+
JANET_ASYNC_WRITEMODE_CONNECT
24602478
} JanetWriteMode;
24612479

24622480
typedef struct {
@@ -2480,6 +2498,31 @@ typedef struct {
24802498
#endif
24812499
} StateWrite;
24822500

2501+
static JanetAsyncStatus handle_connect(JanetListenerState *s) {
2502+
#ifdef JANET_WINDOWS
2503+
int res = 0;
2504+
int size = sizeof(res);
2505+
int r = getsockopt((SOCKET)s->stream->handle, SOL_SOCKET, SO_ERROR, (char *)&res, &size);
2506+
#else
2507+
int res = 0;
2508+
socklen_t size = sizeof res;
2509+
int r = getsockopt(s->stream->handle, SOL_SOCKET, SO_ERROR, &res, &size);
2510+
#endif
2511+
if (r == 0) {
2512+
if (res == 0) {
2513+
janet_schedule(s->fiber, janet_wrap_abstract(s->stream));
2514+
} else {
2515+
s->stream->flags |= JANET_STREAM_TOCLOSE;
2516+
janet_cancel(s->fiber, janet_cstringv(strerror(res)));
2517+
}
2518+
} else {
2519+
s->stream->flags |= JANET_STREAM_TOCLOSE;
2520+
janet_cancel(s->fiber, janet_ev_lasterr());
2521+
}
2522+
return JANET_ASYNC_STATUS_DONE;
2523+
}
2524+
2525+
24832526
JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event) {
24842527
StateWrite *state = (StateWrite *) s;
24852528
switch (event) {
@@ -2509,6 +2552,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
25092552
}
25102553
break;
25112554
case JANET_ASYNC_EVENT_USER: {
2555+
#ifdef JANET_NET
2556+
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
2557+
return handle_connect(s);
2558+
}
2559+
#endif
25122560
/* Begin write */
25132561
int32_t len;
25142562
const uint8_t *bytes;
@@ -2572,6 +2620,11 @@ JanetAsyncStatus ev_machine_write(JanetListenerState *s, JanetAsyncEvent event)
25722620
janet_cancel(s->fiber, janet_cstringv("stream hup"));
25732621
return JANET_ASYNC_STATUS_DONE;
25742622
case JANET_ASYNC_EVENT_WRITE: {
2623+
#ifdef JANET_NET
2624+
if (state->mode == JANET_ASYNC_WRITEMODE_CONNECT) {
2625+
return handle_connect(s);
2626+
}
2627+
#endif
25752628
int32_t start, len;
25762629
const uint8_t *bytes;
25772630
start = state->start;
@@ -2674,6 +2727,10 @@ void janet_ev_sendto_buffer(JanetStream *stream, JanetBuffer *buf, void *dest, i
26742727
void janet_ev_sendto_string(JanetStream *stream, JanetString str, void *dest, int flags) {
26752728
janet_ev_write_generic(stream, (void *) str, dest, JANET_ASYNC_WRITEMODE_SENDTO, 0, flags);
26762729
}
2730+
2731+
void janet_ev_connect(JanetStream *stream, int flags) {
2732+
janet_ev_write_generic(stream, NULL, NULL, JANET_ASYNC_WRITEMODE_CONNECT, 0, flags);
2733+
}
26772734
#endif
26782735

26792736
/* For a pipe ID */

src/core/net.c

+21-10
Original file line numberDiff line numberDiff line change
@@ -477,32 +477,43 @@ JANET_CORE_FN(cfun_net_connect,
477477
}
478478
}
479479

480+
/* Wrap socket in abstract type JanetStream */
481+
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
482+
483+
/* Set up the socket for non-blocking IO before connecting */
484+
janet_net_socknoblock(sock);
485+
480486
/* Connect to socket */
481487
#ifdef JANET_WINDOWS
482488
int status = WSAConnect(sock, addr, addrlen, NULL, NULL, NULL, NULL);
483-
Janet lasterr = janet_ev_lasterr();
489+
int err = WSAGetLastError();
484490
freeaddrinfo(ai);
485491
#else
486492
int status = connect(sock, addr, addrlen);
487-
Janet lasterr = janet_ev_lasterr();
493+
int err = errno;
488494
if (is_unix) {
489495
janet_free(ai);
490496
} else {
491497
freeaddrinfo(ai);
492498
}
493499
#endif
494500

495-
if (status == -1) {
496-
JSOCKCLOSE(sock);
497-
janet_panicf("could not connect socket: %V", lasterr);
501+
if (status != 0) {
502+
#ifdef JANET_WINDOWS
503+
if (err != WSAEWOULDBLOCK) {
504+
#else
505+
if (err != EINPROGRESS) {
506+
#endif
507+
JSOCKCLOSE(sock);
508+
Janet lasterr = janet_ev_lasterr();
509+
janet_panicf("could not connect socket: %V", lasterr);
510+
}
498511
}
499512

500-
/* Set up the socket for non-blocking IO after connect - TODO - non-blocking connect? */
501-
janet_net_socknoblock(sock);
513+
/* Handle the connect() result in the event loop*/
514+
janet_ev_connect(stream, MSG_NOSIGNAL);
502515

503-
/* Wrap socket in abstract type JanetStream */
504-
JanetStream *stream = make_stream(sock, JANET_STREAM_READABLE | JANET_STREAM_WRITABLE);
505-
return janet_wrap_abstract(stream);
516+
janet_await();
506517
}
507518

508519
static const char *serverify_socket(JSock sfd) {

src/include/janet.h

+2
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,7 @@ typedef void *JanetAbstract;
568568
#define JANET_STREAM_WRITABLE 0x400
569569
#define JANET_STREAM_ACCEPTABLE 0x800
570570
#define JANET_STREAM_UDPSERVER 0x1000
571+
#define JANET_STREAM_TOCLOSE 0x10000
571572

572573
typedef enum {
573574
JANET_ASYNC_EVENT_INIT,
@@ -1479,6 +1480,7 @@ JANET_API void janet_ev_readchunk(JanetStream *stream, JanetBuffer *buf, int32_t
14791480
JANET_API void janet_ev_recv(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
14801481
JANET_API void janet_ev_recvchunk(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
14811482
JANET_API void janet_ev_recvfrom(JanetStream *stream, JanetBuffer *buf, int32_t nbytes, int flags);
1483+
JANET_API void janet_ev_connect(JanetStream *stream, int flags);
14821484
#endif
14831485

14841486
/* Write async to a stream */

0 commit comments

Comments
 (0)