Skip to content
This repository was archived by the owner on Mar 4, 2024. It is now read-only.

Add additional async getaddrinfo step to outgoing tcp connections #301

Merged
merged 2 commits into from
Aug 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion src/uv_ip.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,52 @@

#include "uv_ip.h"

static const char *strCpyUntil(char *target,
const char *source,
size_t target_size,
char separator)
{
size_t i;
for (i = 0; i < target_size; ++i) {
if (!source[i] || source[i] == separator) {
target[i] = 0;
return source + i;
} else {
target[i] = source[i];
}
}
return NULL;
}

int uvIpAddrSplit(const char *address,
char *host,
size_t host_size,
char *service,
size_t service_size)
{
char colon = ':';
const char *service_ptr = NULL;

if (host) {
service_ptr = strCpyUntil(host, address, host_size, colon);
if (!service_ptr) {
return RAFT_NAMETOOLONG;
}
}
if (service) {
if (!service_ptr) {
service_ptr = strchr(address, colon);
}
if (!service_ptr || *service_ptr == 0 || *(++service_ptr) == 0) {
service_ptr = "8080";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's special about 8080? Maybe document this somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a question for @freeekanayaka as I kept it fully compatible with the existing uvIpParse (which was used before). And he introduced this default with the first version of the file visible to me in the repo.

}
if (!strCpyUntil(service, service_ptr, service_size, 0)) {
return RAFT_NAMETOOLONG;
}
}
return 0;
}

int uvIpParse(const char *address, struct sockaddr_in *addr)
{
char buf[256];
Expand All @@ -17,7 +63,7 @@ int uvIpParse(const char *address, struct sockaddr_in *addr)
int rv;

/* TODO: turn this poor man parsing into proper one */
n = sizeof(buf)-1;
n = sizeof(buf) - 1;
strncpy(buf, address, n);
buf[n] = '\0';
host = strtok(buf, colon);
Expand Down
7 changes: 7 additions & 0 deletions src/uv_ip.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@

#include <netinet/in.h>

/* Split @address into @host and @service. */
int uvIpAddrSplit(const char *address,
char *host,
size_t host_size,
char *service,
size_t service_size);

/* Split @address into @host and @port and populate @addr accordingly. */
int uvIpParse(const char *address, struct sockaddr_in *addr);

Expand Down
116 changes: 91 additions & 25 deletions src/uv_tcp_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@
/* Hold state for a single connection request. */
struct uvTcpConnect
{
struct UvTcp *t; /* Transport implementation */
struct raft_uv_connect *req; /* User request */
uv_buf_t handshake; /* Handshake data */
struct uv_tcp_s *tcp; /* TCP connection socket handle */
struct uv_connect_s connect; /* TCP connection request */
struct uv_write_s write; /* TCP handshake request */
int status; /* Returned to the request callback */
queue queue; /* Pending connect queue */
struct UvTcp *t; /* Transport implementation */
struct raft_uv_connect *req; /* User request */
uv_buf_t handshake; /* Handshake data */
struct uv_tcp_s *tcp; /* TCP connection socket handle */
struct uv_getaddrinfo_s getaddrinfo; /* DNS resolve request */
struct uv_connect_s connect; /* TCP connection request */
struct uv_write_s write; /* TCP handshake request */
int status; /* Returned to the request callback */
bool resolving; /* Indicate name resolving in progress */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aligning with the comments above would be a bit more aesthetically pleasing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the project has a .clang-format file in it's root folder I ran a clang-format on all files I touched. My expectation was all format will be fine after that.

Copy link
Contributor Author

@NorbertHeusser NorbertHeusser Aug 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting: Using the clang-format option AlignTrailingComments: true seems not to work on comment blocks, but just on single line comments //
Not sure, if it you are willing to use // for single line comments, which would solve this problem based on clang-format. But some people don't like using the c++ originated sigle line comment in C file (even knowing this is allowed since C99).

queue queue; /* Pending connect queue */
};

/* Encode an handshake message into the given buffer. */
Expand Down Expand Up @@ -65,6 +67,7 @@ static void uvTcpConnectFinish(struct uvTcpConnect *connect)
int status = connect->status;
QUEUE_REMOVE(&connect->queue);
RaftHeapFree(connect->handshake.base);
uv_freeaddrinfo(connect->getaddrinfo.addrinfo);
raft_free(connect);
req->cb(req, stream, status);
}
Expand All @@ -88,7 +91,13 @@ static void uvTcpConnectAbort(struct uvTcpConnect *connect)
{
QUEUE_REMOVE(&connect->queue);
QUEUE_PUSH(&connect->t->aborting, &connect->queue);
uv_close((struct uv_handle_s *)connect->tcp, uvTcpConnectUvCloseCb);
uv_cancel((struct uv_req_s *)&connect->getaddrinfo);
/* Call uv_close on the tcp handle, if there is no getaddrinfo request
in flight. Data structures may only be freed after the uvGetAddrInfoCb was
triggered. Tcp handle will be closed in the uvGetAddrInfoCb in this case. */
if (!connect->resolving) {
uv_close((struct uv_handle_s *)connect->tcp, uvTcpConnectUvCloseCb);
}
}

/* The handshake TCP write completes. Fire the connect callback. */
Expand Down Expand Up @@ -146,55 +155,111 @@ static void uvTcpConnectUvConnectCb(struct uv_connect_s *req, int status)
uvTcpConnectAbort(connect);
}

/* Create a new TCP handle and submit a connection request to the event loop. */
static int uvTcpConnectStart(struct uvTcpConnect *r, const char *address)
/* The hostname resolve is finished */
static void uvGetAddrInfoCb(uv_getaddrinfo_t *req,
int status,
struct addrinfo *res)
{
struct UvTcp *t = r->t;
struct sockaddr_in addr;
struct uvTcpConnect *connect = req->data;
struct UvTcp *t = connect->t;
int rv;

rv = uvIpParse(address, &addr);
connect->resolving =
false; /* Indicate we are in the name resolving phase */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to write "Indicate that we are done with the resolving phase"


if (t->closing) {
connect->status = RAFT_CANCELED;

/* We need to close the tcp handle to to connection attempt */
uv_close((struct uv_handle_s *)connect->tcp, uvTcpConnectUvCloseCb);
return;
}

if (status < 0) {
ErrMsgPrintf(t->transport->errmsg, "uv_getaddrinfo(): %s",
uv_err_name(status));
connect->status = RAFT_NOCONNECTION;
goto err;
}
rv = uv_tcp_connect(&connect->connect, connect->tcp,
(const struct sockaddr *)res->ai_addr,
uvTcpConnectUvConnectCb);
if (rv != 0) {
/* UNTESTED: since parsing succeed, this should fail only because of
* lack of system resources */
ErrMsgPrintf(t->transport->errmsg, "uv_tcp_connect(): %s",
uv_strerror(rv));
connect->status = RAFT_NOCONNECTION;
goto err;
}

return;

err:
uvTcpConnectAbort(connect);
}
/* Create a new TCP handle and submit a connection request to the event loop. */
static int uvTcpConnectStart(struct uvTcpConnect *r, const char *address)
{
static struct addrinfo hints = {
.ai_flags = AI_V4MAPPED | AI_ADDRCONFIG | AI_NUMERICHOST,
.ai_family = AF_INET,
.ai_socktype = SOCK_STREAM,
.ai_protocol = 0};
struct UvTcp *t = r->t;
char hostname[NI_MAXHOST];
char service[NI_MAXSERV];
int rv;

r->handshake.base = NULL;

/* Initialize the handshake buffer. */
rv = uvTcpEncodeHandshake(t->id, t->address, &r->handshake);
if (rv != 0) {
assert(rv == RAFT_NOMEM);
ErrMsgOom(r->t->transport->errmsg);
ErrMsgOom(t->transport->errmsg);
goto err;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handshake.base is freed after err, but never allocated. It looks safe right now, but it's not very pretty and can lead to an erroneous free in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initialise the pointer to NULL in the beginning of the function. As the raft_free (like the posix free is safe to be invoked on a NULL pointer) it safe from my perspective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the default port if none is specified. We should indeed document it, or perhaps return an error instead.

}

r->tcp = RaftHeapMalloc(sizeof *r->tcp);
if (r->tcp == NULL) {
ErrMsgOom(t->transport->errmsg);
rv = RAFT_NOMEM;
goto err_after_encode_handshake;
goto err;
}

rv = uv_tcp_init(r->t->loop, r->tcp);
assert(rv == 0);
r->tcp->data = r;

rv = uv_tcp_connect(&r->connect, r->tcp, (struct sockaddr *)&addr,
uvTcpConnectUvConnectCb);
if (rv != 0) {
/* UNTESTED: since parsing succeed, this should fail only because of
* lack of system resources */
ErrMsgPrintf(t->transport->errmsg, "uv_tcp_connect(): %s",
rv = uvIpAddrSplit(address, hostname, sizeof(hostname), service,
sizeof(service));
if (rv) {
ErrMsgPrintf(t->transport->errmsg,
"uv_tcp_connect(): Cannot split %s into host and service",
address);
rv = RAFT_NOCONNECTION;
goto err_after_tcp_init;
}
rv = uv_getaddrinfo(r->t->loop, &r->getaddrinfo, &uvGetAddrInfoCb, hostname,
service, &hints);
if (rv) {
ErrMsgPrintf(t->transport->errmsg,
"uv_tcp_connect(): Cannot initiate getaddrinfo %s",
uv_strerror(rv));
rv = RAFT_NOCONNECTION;
goto err_after_tcp_init;
}
r->resolving = true; /* Indicate we are in the name resolving phase */

return 0;

err_after_tcp_init:
uv_close((uv_handle_t *)r->tcp, (uv_close_cb)RaftHeapFree);
err_after_encode_handshake:
RaftHeapFree(r->handshake.base);

err:
RaftHeapFree(r->handshake.base);

return rv;
}

Expand All @@ -221,8 +286,9 @@ int UvTcpConnect(struct raft_uv_transport *transport,
r->req = req;
r->status = 0;
r->write.data = r;
r->getaddrinfo.data = r;
r->resolving = false;
r->connect.data = r;

req->cb = cb;

/* Keep track of the pending request */
Expand Down
36 changes: 34 additions & 2 deletions test/integration/test_uv_tcp_connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ static void tearDown(void *data)
*****************************************************************************/

#define BOGUS_ADDRESS "127.0.0.1:6666"
#define INVALID_ADDRESS "500.0.0.1:6666"

SUITE(tcp_connect)

Expand Down Expand Up @@ -214,13 +215,21 @@ TEST(tcp_connect, closeImmediately, setUp, tearDownDeps, 0, NULL)
}

/* The transport gets closed during the handshake. */
TEST(tcp_connect, closeDuringHandshake, setUp, tearDownDeps, 0, NULL)
TEST(tcp_connect, closeDuringDnsLookup, setUp, tearDownDeps, 0, NULL)
{
struct fixture *f = data;
CONNECT_CLOSE(2, TCP_SERVER_ADDRESS, 1);
return MUNIT_OK;
}

/* The transport gets closed during the handshake. */
TEST(tcp_connect, closeDuringHandshake, setUp, tearDownDeps, 0, NULL)
{
struct fixture *f = data;
CONNECT_CLOSE(2, TCP_SERVER_ADDRESS, 2);
return MUNIT_OK;
}

static void checkCb(struct uv_check_s *check)
{
struct fixture *f = check->data;
Expand All @@ -230,7 +239,7 @@ static void checkCb(struct uv_check_s *check)

/* The transport gets closed right after a connection failure, while the
* connection attempt is being aborted. */
TEST(tcp_connect, closeDuringAbort, setUp, tearDownDeps, 0, NULL)
TEST(tcp_connect, closeDuringDnsLookupAbort, setUp, tearDownDeps, 0, NULL)
{
struct fixture *f = data;
struct uv_check_s check;
Expand All @@ -241,7 +250,30 @@ TEST(tcp_connect, closeDuringAbort, setUp, tearDownDeps, 0, NULL)
munit_assert_int(rv, ==, 0);
check.data = f;
uv_check_start(&check, checkCb);
CONNECT_REQ(2, INVALID_ADDRESS, 0, RAFT_NOCONNECTION);
LOOP_RUN(1);
LOOP_RUN_UNTIL(&_result.done);
CLOSE_WAIT;
return MUNIT_OK;
}

/* The transport gets closed right after a connection failure, while the
* connection attempt is being aborted. */
TEST(tcp_connect, closeDuringConnectAbort, setUp, tearDownDeps, 0, NULL)
{
struct fixture *f = data;
struct uv_check_s check;
int rv;
/* Use a check handle in order to close the transport in the same loop
* iteration where the connection failure occurs. */
rv = uv_check_init(&f->loop, &check);
munit_assert_int(rv, ==, 0);
check.data = f;
CONNECT_REQ(2, BOGUS_ADDRESS, 0, RAFT_NOCONNECTION);
/* Successfull DNS lookup will initiate async connect */
LOOP_RUN(1);
// Now start the check handle to fire in the next iteration */
uv_check_start(&check, checkCb);
LOOP_RUN(1);
LOOP_RUN_UNTIL(&_result.done);
CLOSE_WAIT;
Expand Down