Skip to content
This repository was archived by the owner on Mar 4, 2024. It is now read-only.

Commit c4e3819

Browse files
Add additional asnyc getaddrinfo step to outgoing tcp connection
Signed-off-by: Norbert Heusser <norbert.heusser@cedalo.com>
1 parent 3f69e7c commit c4e3819

File tree

4 files changed

+187
-28
lines changed

4 files changed

+187
-28
lines changed

src/uv_ip.c

+47-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,52 @@
77

88
#include "uv_ip.h"
99

10+
static const char *strCpyUntil(char *target,
11+
const char *source,
12+
size_t target_size,
13+
char separator)
14+
{
15+
size_t i;
16+
for (i = 0; i < target_size; ++i) {
17+
if (!source[i] || source[i] == separator) {
18+
target[i] = 0;
19+
return source + i;
20+
} else {
21+
target[i] = source[i];
22+
}
23+
}
24+
return NULL;
25+
}
26+
27+
int uvIpAddrSplit(const char *address,
28+
char *host,
29+
size_t host_size,
30+
char *service,
31+
size_t service_size)
32+
{
33+
char colon = ':';
34+
const char *service_ptr = NULL;
35+
36+
if (host) {
37+
service_ptr = strCpyUntil(host, address, host_size, colon);
38+
if (!service_ptr) {
39+
return RAFT_NAMETOOLONG;
40+
}
41+
}
42+
if (service) {
43+
if (!service_ptr) {
44+
service_ptr = strchr(address, colon);
45+
}
46+
if (!service_ptr || *service_ptr == 0 || *(++service_ptr) == 0) {
47+
service_ptr = "8080";
48+
}
49+
if (!strCpyUntil(service, service_ptr, service_size, 0)) {
50+
return RAFT_NAMETOOLONG;
51+
}
52+
}
53+
return 0;
54+
}
55+
1056
int uvIpParse(const char *address, struct sockaddr_in *addr)
1157
{
1258
char buf[256];
@@ -17,7 +63,7 @@ int uvIpParse(const char *address, struct sockaddr_in *addr)
1763
int rv;
1864

1965
/* TODO: turn this poor man parsing into proper one */
20-
n = sizeof(buf)-1;
66+
n = sizeof(buf) - 1;
2167
strncpy(buf, address, n);
2268
buf[n] = '\0';
2369
host = strtok(buf, colon);

src/uv_ip.h

+7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55

66
#include <netinet/in.h>
77

8+
/* Split @address into @host and @service. */
9+
int uvIpAddrSplit(const char *address,
10+
char *host,
11+
size_t host_size,
12+
char *service,
13+
size_t service_size);
14+
815
/* Split @address into @host and @port and populate @addr accordingly. */
916
int uvIpParse(const char *address, struct sockaddr_in *addr);
1017

src/uv_tcp_connect.c

+99-25
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@
2525
/* Hold state for a single connection request. */
2626
struct uvTcpConnect
2727
{
28-
struct UvTcp *t; /* Transport implementation */
29-
struct raft_uv_connect *req; /* User request */
30-
uv_buf_t handshake; /* Handshake data */
31-
struct uv_tcp_s *tcp; /* TCP connection socket handle */
32-
struct uv_connect_s connect; /* TCP connection request */
33-
struct uv_write_s write; /* TCP handshake request */
34-
int status; /* Returned to the request callback */
35-
queue queue; /* Pending connect queue */
28+
struct UvTcp *t; /* Transport implementation */
29+
struct raft_uv_connect *req; /* User request */
30+
uv_buf_t handshake; /* Handshake data */
31+
struct uv_tcp_s *tcp; /* TCP connection socket handle */
32+
struct uv_getaddrinfo_s getaddrinfo; /* DNS resolve request */
33+
struct uv_connect_s connect; /* TCP connection request */
34+
struct uv_write_s write; /* TCP handshake request */
35+
uv_check_t delayedtcpclose; /* A check handle required to delay closing tcp */
36+
int status; /* Returned to the request callback */
37+
queue queue; /* Pending connect queue */
3638
};
3739

3840
/* Encode an handshake message into the given buffer. */
@@ -64,7 +66,9 @@ static void uvTcpConnectFinish(struct uvTcpConnect *connect)
6466
struct raft_uv_connect *req = connect->req;
6567
int status = connect->status;
6668
QUEUE_REMOVE(&connect->queue);
69+
uv_close((struct uv_handle_s *)&connect->delayedtcpclose, NULL);
6770
RaftHeapFree(connect->handshake.base);
71+
uv_freeaddrinfo(connect->getaddrinfo.addrinfo);
6872
raft_free(connect);
6973
req->cb(req, stream, status);
7074
}
@@ -83,12 +87,29 @@ static void uvTcpConnectUvCloseCb(struct uv_handle_s *handle)
8387
UvTcpMaybeFireCloseCb(t);
8488
}
8589

90+
static void uvTcpConnectCheckMayClose(uv_check_t *handle)
91+
{
92+
struct uvTcpConnect *connect = handle->data;
93+
if (connect->status || uv_is_active((uv_handle_t *)connect->tcp)) {
94+
uv_check_stop(&connect->delayedtcpclose);
95+
uv_close((struct uv_handle_s *)connect->tcp, uvTcpConnectUvCloseCb);
96+
}
97+
}
98+
8699
/* Abort a connection request. */
87100
static void uvTcpConnectAbort(struct uvTcpConnect *connect)
88101
{
89102
QUEUE_REMOVE(&connect->queue);
90103
QUEUE_PUSH(&connect->t->aborting, &connect->queue);
91-
uv_close((struct uv_handle_s *)connect->tcp, uvTcpConnectUvCloseCb);
104+
if (uv_cancel((struct uv_req_s *)&connect->getaddrinfo) == 0 ||
105+
!uv_is_active((uv_handle_t *)connect->tcp)) {
106+
/* If canceling the addrinfo call was not successfull, but the tcp
107+
handle is not active the getaddrinfo is in progress and we need to
108+
delay closing the tcp handle until it's finished */
109+
uv_check_start(&connect->delayedtcpclose, uvTcpConnectCheckMayClose);
110+
} else {
111+
uv_close((struct uv_handle_s *)connect->tcp, uvTcpConnectUvCloseCb);
112+
}
92113
}
93114

94115
/* The handshake TCP write completes. Fire the connect callback. */
@@ -146,43 +167,93 @@ static void uvTcpConnectUvConnectCb(struct uv_connect_s *req, int status)
146167
uvTcpConnectAbort(connect);
147168
}
148169

149-
/* Create a new TCP handle and submit a connection request to the event loop. */
150-
static int uvTcpConnectStart(struct uvTcpConnect *r, const char *address)
170+
static void uvGetAddrInfoCb(uv_getaddrinfo_t *req,
171+
int status,
172+
struct addrinfo *res)
151173
{
152-
struct UvTcp *t = r->t;
153-
struct sockaddr_in addr;
174+
struct uvTcpConnect *connect = req->data;
175+
struct UvTcp *t = connect->t;
154176
int rv;
155177

156-
rv = uvIpParse(address, &addr);
178+
if (t->closing || status == UV_ECANCELED) {
179+
connect->status = RAFT_CANCELED;
180+
return;
181+
}
182+
183+
if (status < 0) {
184+
ErrMsgPrintf(t->transport->errmsg, "uv_getaddrinfo(): %s",
185+
uv_err_name(status));
186+
connect->status = RAFT_NOCONNECTION;
187+
goto err;
188+
}
189+
rv = uv_tcp_connect(&connect->connect, connect->tcp,
190+
(const struct sockaddr *)res->ai_addr,
191+
uvTcpConnectUvConnectCb);
157192
if (rv != 0) {
193+
/* UNTESTED: since parsing succeed, this should fail only because of
194+
* lack of system resources */
195+
ErrMsgPrintf(t->transport->errmsg, "uv_tcp_connect(): %s",
196+
uv_strerror(rv));
197+
connect->status = RAFT_NOCONNECTION;
158198
goto err;
159199
}
160200

201+
return;
202+
203+
err:
204+
uvTcpConnectAbort(connect);
205+
}
206+
/* Create a new TCP handle and submit a connection request to the event loop. */
207+
static int uvTcpConnectStart(struct uvTcpConnect *r, const char *address)
208+
{
209+
static struct addrinfo hints = {
210+
.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | AI_NUMERICHOST,
211+
.ai_family = AF_INET,
212+
.ai_socktype = SOCK_STREAM,
213+
.ai_protocol = 0};
214+
struct UvTcp *t = r->t;
215+
char hostname[NI_MAXHOST];
216+
char service[NI_MAXSERV];
217+
int rv;
218+
219+
r->handshake.base = NULL;
220+
161221
/* Initialize the handshake buffer. */
162222
rv = uvTcpEncodeHandshake(t->id, t->address, &r->handshake);
163223
if (rv != 0) {
164224
assert(rv == RAFT_NOMEM);
165-
ErrMsgOom(r->t->transport->errmsg);
225+
ErrMsgOom(t->transport->errmsg);
166226
goto err;
167227
}
168228

169229
r->tcp = RaftHeapMalloc(sizeof *r->tcp);
170230
if (r->tcp == NULL) {
171231
ErrMsgOom(t->transport->errmsg);
172232
rv = RAFT_NOMEM;
173-
goto err_after_encode_handshake;
233+
goto err;
174234
}
175235

236+
rv = uv_check_init(t->loop, &r->delayedtcpclose);
237+
assert(rv == 0);
238+
176239
rv = uv_tcp_init(r->t->loop, r->tcp);
177240
assert(rv == 0);
178241
r->tcp->data = r;
179242

180-
rv = uv_tcp_connect(&r->connect, r->tcp, (struct sockaddr *)&addr,
181-
uvTcpConnectUvConnectCb);
182-
if (rv != 0) {
183-
/* UNTESTED: since parsing succeed, this should fail only because of
184-
* lack of system resources */
185-
ErrMsgPrintf(t->transport->errmsg, "uv_tcp_connect(): %s",
243+
rv = uvIpAddrSplit(address, hostname, sizeof(hostname), service,
244+
sizeof(service));
245+
if (rv) {
246+
ErrMsgPrintf(t->transport->errmsg,
247+
"uv_tcp_connect(): Cannot split %s into host and service",
248+
address);
249+
rv = RAFT_NOCONNECTION;
250+
goto err_after_tcp_init;
251+
}
252+
rv = uv_getaddrinfo(r->t->loop, &r->getaddrinfo, &uvGetAddrInfoCb, hostname,
253+
service, &hints);
254+
if (rv) {
255+
ErrMsgPrintf(t->transport->errmsg,
256+
"uv_tcp_connect(): Cannot initiate getaddrinfo %s",
186257
uv_strerror(rv));
187258
rv = RAFT_NOCONNECTION;
188259
goto err_after_tcp_init;
@@ -192,9 +263,11 @@ static int uvTcpConnectStart(struct uvTcpConnect *r, const char *address)
192263

193264
err_after_tcp_init:
194265
uv_close((uv_handle_t *)r->tcp, (uv_close_cb)RaftHeapFree);
195-
err_after_encode_handshake:
196-
RaftHeapFree(r->handshake.base);
266+
uv_close((uv_handle_t *)&r->delayedtcpclose, NULL);
267+
197268
err:
269+
RaftHeapFree(r->handshake.base);
270+
198271
return rv;
199272
}
200273

@@ -221,8 +294,9 @@ int UvTcpConnect(struct raft_uv_transport *transport,
221294
r->req = req;
222295
r->status = 0;
223296
r->write.data = r;
297+
r->getaddrinfo.data = r;
224298
r->connect.data = r;
225-
299+
r->delayedtcpclose.data = r;
226300
req->cb = cb;
227301

228302
/* Keep track of the pending request */

test/integration/test_uv_tcp_connect.c

+34-2
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ static void tearDown(void *data)
165165
*****************************************************************************/
166166

167167
#define BOGUS_ADDRESS "127.0.0.1:6666"
168+
#define INVALID_ADDRESS "500.0.0.1:6666"
168169

169170
SUITE(tcp_connect)
170171

@@ -214,13 +215,21 @@ TEST(tcp_connect, closeImmediately, setUp, tearDownDeps, 0, NULL)
214215
}
215216

216217
/* The transport gets closed during the handshake. */
217-
TEST(tcp_connect, closeDuringHandshake, setUp, tearDownDeps, 0, NULL)
218+
TEST(tcp_connect, closeDuringDnsLookup, setUp, tearDownDeps, 0, NULL)
218219
{
219220
struct fixture *f = data;
220221
CONNECT_CLOSE(2, TCP_SERVER_ADDRESS, 1);
221222
return MUNIT_OK;
222223
}
223224

225+
/* The transport gets closed during the handshake. */
226+
TEST(tcp_connect, closeDuringHandshake, setUp, tearDownDeps, 0, NULL)
227+
{
228+
struct fixture *f = data;
229+
CONNECT_CLOSE(2, TCP_SERVER_ADDRESS, 2);
230+
return MUNIT_OK;
231+
}
232+
224233
static void checkCb(struct uv_check_s *check)
225234
{
226235
struct fixture *f = check->data;
@@ -230,7 +239,7 @@ static void checkCb(struct uv_check_s *check)
230239

231240
/* The transport gets closed right after a connection failure, while the
232241
* connection attempt is being aborted. */
233-
TEST(tcp_connect, closeDuringAbort, setUp, tearDownDeps, 0, NULL)
242+
TEST(tcp_connect, closeDuringDnsLookupAbort, setUp, tearDownDeps, 0, NULL)
234243
{
235244
struct fixture *f = data;
236245
struct uv_check_s check;
@@ -241,7 +250,30 @@ TEST(tcp_connect, closeDuringAbort, setUp, tearDownDeps, 0, NULL)
241250
munit_assert_int(rv, ==, 0);
242251
check.data = f;
243252
uv_check_start(&check, checkCb);
253+
CONNECT_REQ(2, INVALID_ADDRESS, 0, RAFT_NOCONNECTION);
254+
LOOP_RUN(1);
255+
LOOP_RUN_UNTIL(&_result.done);
256+
CLOSE_WAIT;
257+
return MUNIT_OK;
258+
}
259+
260+
/* The transport gets closed right after a connection failure, while the
261+
* connection attempt is being aborted. */
262+
TEST(tcp_connect, closeDuringConnectAbort, setUp, tearDownDeps, 0, NULL)
263+
{
264+
struct fixture *f = data;
265+
struct uv_check_s check;
266+
int rv;
267+
/* Use a check handle in order to close the transport in the same loop
268+
* iteration where the connection failure occurs. */
269+
rv = uv_check_init(&f->loop, &check);
270+
munit_assert_int(rv, ==, 0);
271+
check.data = f;
244272
CONNECT_REQ(2, BOGUS_ADDRESS, 0, RAFT_NOCONNECTION);
273+
/* Successfull DNS lookup will initiate async connect */
274+
LOOP_RUN(1);
275+
// Now start the check handle to fire in the next iteration */
276+
uv_check_start(&check, checkCb);
245277
LOOP_RUN(1);
246278
LOOP_RUN_UNTIL(&_result.done);
247279
CLOSE_WAIT;

0 commit comments

Comments
 (0)