diff --git a/Makefile.am b/Makefile.am index e3fdc4152..2137932d9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -11,7 +11,7 @@ raftinclude_HEADERS = lib_LTLIBRARIES = libraft.la libraft_la_CFLAGS = $(AM_CFLAGS) -fvisibility=hidden -libraft_la_LDFLAGS = -version-info 2:0:0 +libraft_la_LDFLAGS = -version-info 3:0:0 libraft_la_SOURCES = \ src/byte.c \ src/client.c \ diff --git a/configure.ac b/configure.ac index 32015d3ba..861780861 100644 --- a/configure.ac +++ b/configure.ac @@ -1,5 +1,5 @@ AC_PREREQ(2.60) -AC_INIT([raft], [0.16.0]) +AC_INIT([raft], [0.17.0]) AC_LANG([C]) AC_CONFIG_MACRO_DIR([m4]) AC_CONFIG_AUX_DIR([ac]) diff --git a/example/server.c b/example/server.c index 5b4883107..cd75c52c2 100644 --- a/example/server.c +++ b/example/server.c @@ -181,6 +181,8 @@ static int ServerInit(struct Server *s, s->timer.data = s; /* Initialize the TCP-based RPC transport. */ + s->transport.version = 1; + s->transport.data = NULL; rv = raft_uv_tcp_init(&s->transport, s->loop); if (rv != 0) { goto err; diff --git a/include/raft.h b/include/raft.h index 608622e88..987d766ec 100644 --- a/include/raft.h +++ b/include/raft.h @@ -4,10 +4,21 @@ #include #include #include +#include #include #define RAFT_API __attribute__((visibility("default"))) +/** + * Version. + */ +#define RAFT_VERSION_MAJOR 1 +#define RAFT_VERSION_MINOR 17 +#define RAFT_VERSION_RELEASE 0 +#define RAFT_VERSION_NUMBER (RAFT_VERSION_MAJOR *100*100 + RAFT_VERSION_MINOR *100 + RAFT_VERSION_RELEASE) + +int raft_version_number (void); + /** * Error codes. */ @@ -73,15 +84,6 @@ struct raft_buffer size_t len; /* Length of the buffer. */ }; -/** - * A type for storing unknown bools. - */ -typedef enum { - raft_tribool_unknown, - raft_tribool_true, - raft_tribool_false, -} raft_tribool; -#define TO_RAFT_TRIBOOL(b) ((b) ? raft_tribool_true : raft_tribool_false) /** * Server role codes. @@ -94,6 +96,7 @@ enum { /** * Hold information about a single server in the cluster configuration. + * WARNING: This struct is encoded/decoded, be careful when adapting it. */ struct raft_server { @@ -104,6 +107,7 @@ struct raft_server /** * Hold information about all servers currently part of the cluster. + * WARNING: This struct is encoded/decoded, be careful when adapting it. */ struct raft_configuration { @@ -201,50 +205,6 @@ struct raft_entry void *batch; /* Batch that buf's memory points to, if any. */ }; -/** - * Counter for outstanding references to a log entry. - * - * When an entry is first appended to the log, its refcount is set to one (the - * log itself is the only one referencing the entry). Whenever an entry is - * included in an I/O request (to write it to disk or to send it to other - * servers) its refcount is increased by one. Whenever an entry gets deleted - * from the log its refcount is decreased by one. Likewise, whenever an I/O - * request is completed the refcount of the relevant entries is decreased by - * one. When the refcount drops to zero the memory that its @buf attribute - * points to gets released, or, if the @batch attribute is non-NULL, a check is - * made to see if all other entries of the same batch also have a zero refcount, - * and the memory that @batch points to gets released if that's the case. - */ -struct raft_entry_ref -{ - raft_term term; /* Term of the entry being ref-counted. */ - raft_index index; /* Index of the entry being ref-counted. */ - unsigned short count; /* Number of references. */ - struct raft_entry_ref *next; /* Next item in the bucket (for collisions). */ -}; - -/** - * In-memory cache of the persistent raft log stored on disk. - * - * The raft log cache is implemented as a circular buffer of log entries, which - * makes some frequent operations very efficient (e.g. deleting the first N - * entries when snapshotting). - */ -struct raft_log -{ - struct raft_entry *entries; /* Circular buffer of log entries. */ - size_t size; /* Number of available slots in the buffer. */ - size_t front, back; /* Indexes of used slots [front, back). */ - raft_index offset; /* Index of first entry is offset+1. */ - struct raft_entry_ref *refs; /* Log entries reference counts hash table. */ - size_t refs_size; /* Size of the reference counts hash table. */ - struct /* Information about last snapshot, or zero. */ - { - raft_index last_index; /* Snapshot replaces all entries up to here. */ - raft_term last_term; /* Term of last index. */ - } snapshot; -}; - /** * Hold the arguments of a RequestVote RPC. * @@ -252,6 +212,7 @@ struct raft_log */ struct raft_request_vote { + int version; raft_term term; /* Candidate's term. */ raft_id candidate_id; /* ID of the server requesting the vote. */ raft_index last_log_index; /* Index of candidate's last log entry. */ @@ -259,16 +220,19 @@ struct raft_request_vote bool disrupt_leader; /* True if current leader should be discarded. */ bool pre_vote; /* True if this is a pre-vote request. */ }; +#define RAFT_REQUEST_VOTE_VERSION 2 /** * Hold the result of a RequestVote RPC. */ struct raft_request_vote_result { + int version; raft_term term; /* Receiver's current term (candidate updates itself). */ bool vote_granted; /* True means candidate received vote. */ - raft_tribool pre_vote; /* The response to a pre-vote RequestVote or not. */ + bool pre_vote; /* The response to a pre-vote RequestVote or not. */ }; +#define RAFT_REQUEST_VOTE_RESULT_VERSION 2 /** * Hold the arguments of an AppendEntries RPC. @@ -278,6 +242,7 @@ struct raft_request_vote_result */ struct raft_append_entries { + int version; raft_term term; /* Leader's term. */ raft_index prev_log_index; /* Index of log entry preceeding new ones. */ raft_term prev_log_term; /* Term of entry at prev_log_index. */ @@ -285,22 +250,26 @@ struct raft_append_entries struct raft_entry *entries; /* Log entries to append. */ unsigned n_entries; /* Size of the log entries array. */ }; +#define RAFT_APPEND_ENTRIES_VERSION 0 /** * Hold the result of an AppendEntries RPC (figure 3.1). */ struct raft_append_entries_result { + int version; raft_term term; /* Receiver's current_term. */ raft_index rejected; /* If non-zero, the index that was rejected. */ raft_index last_log_index; /* Receiver's last log entry index, as hint. */ }; +#define RAFT_APPEND_ENTRIES_RESULT_VERSION 0 /** * Hold the arguments of an InstallSnapshot RPC (figure 5.3). */ struct raft_install_snapshot { + int version; raft_term term; /* Leader's term. */ raft_index last_index; /* Index of last entry in the snapshot. */ raft_term last_term; /* Term of last_index. */ @@ -308,6 +277,7 @@ struct raft_install_snapshot raft_index conf_index; /* Commit index of conf. */ struct raft_buffer data; /* Raw snapshot data. */ }; +#define RAFT_INSTALL_SNAPSHOT_VERSION 0 /** * Hold the arguments of a TimeoutNow RPC. @@ -317,10 +287,12 @@ struct raft_install_snapshot */ struct raft_timeout_now { + int version; raft_term term; /* Leader's term. */ raft_index last_log_index; /* Index of leader's last log entry. */ raft_index last_log_term; /* Term of log entry at last_log_index. */ }; +#define RAFT_TIMEOUT_NOW_VERSION 0 /** * Type codes for RPC messages. @@ -336,6 +308,30 @@ enum { /** * A single RPC message that can be sent or received over the network. + * + * The RPC message types all have a `version` field. + * In the libuv io implementation, `version` is filled out during decoding + * and is based on the size of the message on the wire, see e.g. + * `sizeofRequestVoteV1`. The version number in the RAFT_MESSAGE_XXX_VERSION + * macro needs to be bumped every time the message is updated. + * + * Notes when adding a new message type to raft: + * raft_io implementations compiled against old versions of raft don't know the + * new message type and possibly have not allocated enough space for it. When + * such an application receives a new message over the wire, the raft_io + * implementation will err out or drop the message, because it doesn't know how + * to decode it based on its type. + * raft_io implementations compiled against versions of raft that know the new + * message type but at runtime are linked against an older raft lib, will pass + * the message to raft, where raft will drop it. + * When raft receives a message and accesses a field of a new message type, + * the raft_io implementation must have known about the new message type, + * so it was compiled against a modern enough version of raft, and memory + * accesses should be safe. + * + * Sending a new message type with a raft_io implementation that doesn't know + * the type is safe, the implementation should drop the message based on its + * type and will not try to access fields it doesn't know the existence of. */ struct raft_message { @@ -354,6 +350,10 @@ struct raft_message /** * Hold the details of a snapshot. + * The user-provided raft_buffer structs should provide the user with enough + * flexibility to adapt/evolve snapshot formats. + * If this struct would NEED to be adapted in the future, raft can always move to + * a new struct with a new name and a new raft_io version. */ struct raft_snapshot { @@ -473,6 +473,11 @@ typedef void (*raft_io_recv_cb)(struct raft_io *io, struct raft_message *msg); typedef void (*raft_io_close_cb)(struct raft_io *io); +/** + * version field MUST be filled out by user. + * When moving to a new version, the user MUST implement the newly added + * methods. + */ struct raft_io { int version; /* 1 or 2 */ @@ -522,7 +527,11 @@ struct raft_io raft_io_async_work_cb cb); }; -/* +/** + * version field MUST be filled out by user. + * When moving to a new version, the user MUST initialize the new methods, + * either with an implementation or with NULL. + * * version 2: * introduces `snapshot_finalize`, when this method is not NULL, it will * always run after a successful call to `snapshot`, whether the snapshot has @@ -545,6 +554,7 @@ struct raft_io * All memory allocated by the snapshot routines MUST be freed by the snapshot * routines themselves. */ + struct raft_fsm { int version; /* 1, 2 or 3 */ @@ -571,19 +581,7 @@ struct raft_fsm */ enum { RAFT_UNAVAILABLE, RAFT_FOLLOWER, RAFT_CANDIDATE, RAFT_LEADER }; -/** - * Used by leaders to keep track of replication progress for each server. - */ -struct raft_progress -{ - unsigned short state; /* Probe, pipeline or snapshot. */ - raft_index next_index; /* Next entry to send. */ - raft_index match_index; /* Highest index reported as replicated. */ - raft_index snapshot_index; /* Last index of most recent snapshot sent. */ - raft_time last_send; /* Timestamp of last AppendEntries RPC. */ - raft_time snapshot_last_send; /* Timestamp of last InstallSnaphot RPC. */ - bool recent_recv; /* A msg was received within election timeout. */ -}; +struct raft_progress; struct raft; /* Forward declaration. */ @@ -598,8 +596,12 @@ typedef void (*raft_close_cb)(struct raft *raft); struct raft_change; /* Forward declaration */ struct raft_transfer; /* Forward declaration */ +struct raft_log; + /** * Hold and drive the state of a single raft server in a cluster. + * When replacing reserved fields in the middle of this struct, you MUST use a + * type with the same size and alignment requirements as the original type. */ struct raft { @@ -616,7 +618,7 @@ struct raft */ raft_term current_term; /* Latest term server has seen. */ raft_id voted_for; /* Candidate that received vote in current term. */ - struct raft_log log; /* Log entries. */ + struct raft_log *log; /* Log entries. */ /* * Current membership configuration (Chapter 4). @@ -642,8 +644,12 @@ struct raft * non-zero, with the latter being greater than the former. In this case * the content of #configuration must match the one of the log entry at * #configuration_uncommitted_index. + * + * TODO previous_configuration will always contain a copy of the previous + * configuration, if any, and is used in configuration rollback scenarios. */ struct raft_configuration configuration; + struct raft_configuration configuration_previous; //currently not used. raft_index configuration_index; raft_index configuration_uncommitted_index; @@ -711,6 +717,7 @@ struct raft raft_id id; char *address; } current_leader; + uint64_t reserved[8]; /* Future use */ } follower_state; struct { @@ -718,6 +725,7 @@ struct raft bool *votes; /* Vote results. */ bool disrupt_leader; /* For leadership transfer */ bool in_pre_vote; /* True in pre-vote phase. */ + uint64_t reserved[8]; /* Future use */ } candidate_state; struct { @@ -728,6 +736,7 @@ struct raft raft_index round_index; /* Target of the current round. */ raft_time round_start; /* Start of current round. */ void *requests[2]; /* Outstanding client requests. */ + uint64_t reserved[8]; /* Future use */ } leader_state; }; @@ -753,6 +762,7 @@ struct raft unsigned trailing; /* N. of trailing entries to retain */ struct raft_snapshot pending; /* In progress snapshot */ struct raft_io_snapshot_put put; /* Store snapshot request */ + uint64_t reserved[8]; /* Future use */ } snapshot; /* @@ -774,6 +784,9 @@ struct raft * being promoted to voter. */ unsigned max_catch_up_rounds; unsigned max_catch_up_round_duration; + + /* Future extensions */ + uint64_t reserved[32]; }; RAFT_API int raft_init(struct raft *r, @@ -904,12 +917,21 @@ RAFT_API raft_index raft_last_index(struct raft *r); */ RAFT_API raft_index raft_last_applied(struct raft *r); -/* Common fields across client request types. */ -#define RAFT__REQUEST \ - void *data; \ - int type; \ - raft_index index; \ - void *queue[2] +/** + * Common fields across client request types. + * `req_id`, `client_id` and `unique_id` are currently unused. + * `reserved` fields should be replaced by new members with the same size + * and alignment requirements as `uint64_t`. + */ +#define RAFT__REQUEST \ + void *data; \ + int type; \ + raft_index index; \ + void *queue[2]; \ + uint8_t req_id[16]; \ + uint8_t client_id[16]; \ + uint8_t unique_id[16]; \ + uint64_t reserved[4] \ /** * Asynchronous request to append a new command entry to the log and apply it to @@ -975,7 +997,7 @@ RAFT_API int raft_barrier(struct raft *r, typedef void (*raft_change_cb)(struct raft_change *req, int status); struct raft_change { - void *data; + RAFT__REQUEST; raft_change_cb cb; }; @@ -1015,7 +1037,7 @@ RAFT_API int raft_remove(struct raft *r, typedef void (*raft_transfer_cb)(struct raft_transfer *req); struct raft_transfer { - void *data; /* User data */ + RAFT__REQUEST; raft_id id; /* ID of target server. */ raft_time start; /* Start of leadership transfer. */ struct raft_io_send send; /* For sending TimeoutNow */ diff --git a/include/raft/fixture.h b/include/raft/fixture.h index de62bd648..56f242296 100644 --- a/include/raft/fixture.h +++ b/include/raft/fixture.h @@ -8,6 +8,8 @@ #include "../raft.h" +#include + #define RAFT_FIXTURE_MAX_SERVERS 8 /** @@ -23,24 +25,22 @@ enum { /** * State of a single server in a cluster fixture. */ -struct raft_fixture_server -{ - bool alive; /* If false, the server is down. */ - raft_id id; /* Server ID. */ - char address[16]; /* Server address (stringified ID). */ - struct raft_tracer tracer; /* Tracer. */ - struct raft_io io; /* In-memory raft_io implementation. */ - struct raft raft; /* Raft instance. */ -}; +struct raft_fixture_server; /** * Information about a test cluster event triggered by the fixture. */ -struct raft_fixture_event -{ - unsigned server_index; /* Index of the server the event occurred on. */ - int type; /* Type of the event. */ -}; +struct raft_fixture_event; + +/** + * Returns the type of the event. + */ +int raft_fixture_event_type(struct raft_fixture_event *event); + +/** + * Returns the server index of the event. + */ +unsigned raft_fixture_event_server_index(struct raft_fixture_event *event); /** * Event callback. See raft_fixture_hook(). @@ -64,34 +64,22 @@ typedef void (*raft_fixture_event_cb)(struct raft_fixture *f, */ struct raft_fixture { - raft_time time; /* Global time, common to all servers. */ - unsigned n; /* Number of servers. */ - raft_id leader_id; /* ID of current leader, or 0 if none. */ - struct raft_log log; /* Copy of current leader's log. */ - raft_index commit_index; /* Current commit index on leader. */ - struct raft_fixture_event event; /* Last event occurred. */ - raft_fixture_event_cb hook; /* Event callback. */ - struct raft_fixture_server servers[RAFT_FIXTURE_MAX_SERVERS]; + raft_time time; /* Global time, common to all servers. */ + unsigned n; /* Number of servers. */ + raft_id leader_id; /* ID of current leader, or 0 if none. */ + struct raft_log *log; /* Copy of current leader's log. */ + raft_index commit_index; /* Current commit index on leader. */ + struct raft_fixture_event *event; /* Last event occurred. */ + raft_fixture_event_cb hook; /* Event callback. */ + struct raft_fixture_server *servers[RAFT_FIXTURE_MAX_SERVERS]; + uint64_t reserved[16]; /* For future expansion of struct. */ }; -/** - * !!! DEPRECATED users should use `raft_fixture_initialize`. !!! - * - * Initialize a raft cluster fixture with @n servers. Each server will use an - * in-memory @raft_io implementation and one of the given @fsms. All servers - * will be initially connected to one another, but they won't be bootstrapped or - * started. - */ -__attribute__((deprecated("use raft_fixture_initialize"))) -RAFT_API int raft_fixture_init(struct raft_fixture *f, - unsigned n, - struct raft_fsm *fsms); - /** * Initialize a raft cluster fixture. Servers can be added by using * `raft_fixture_grow`. */ -RAFT_API int raft_fixture_initialize(struct raft_fixture *f); +RAFT_API int raft_fixture_init(struct raft_fixture *f); /** * Release all memory used by the fixture. diff --git a/include/raft/uv.h b/include/raft/uv.h index 32ec36478..98b0248b3 100644 --- a/include/raft/uv.h +++ b/include/raft/uv.h @@ -177,8 +177,17 @@ typedef void (*raft_uv_transport_close_cb)(struct raft_uv_transport *t); * Interface to establish outgoing connections to other Raft servers and to * accept incoming connections from them. */ + struct raft_uv_transport { + /** + * Keep track of struct version, MUST be filled out by user. + * When moving to a new version, the user MUST implement the newly added + * methods. + * Latest version is 1. + */ + int version; + /** * User defined data. */ diff --git a/src/client.c b/src/client.c index ccd3e5cd3..93186a773 100644 --- a/src/client.c +++ b/src/client.c @@ -36,14 +36,14 @@ int raft_apply(struct raft *r, } /* Index of the first entry being appended. */ - index = logLastIndex(&r->log) + 1; + index = logLastIndex(r->log) + 1; tracef("%u commands starting at %lld", n, index); req->type = RAFT_COMMAND; req->index = index; req->cb = cb; /* Append the new entries to the log. */ - rv = logAppendCommands(&r->log, r->current_term, bufs, n); + rv = logAppendCommands(r->log, r->current_term, bufs, n); if (rv != 0) { goto err; } @@ -58,7 +58,7 @@ int raft_apply(struct raft *r, return 0; err_after_log_append: - logDiscard(&r->log, index); + logDiscard(r->log, index); QUEUE_REMOVE(&req->queue); err: assert(rv != 0); @@ -86,13 +86,13 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb) } /* Index of the barrier entry being appended. */ - index = logLastIndex(&r->log) + 1; + index = logLastIndex(r->log) + 1; tracef("barrier starting at %lld", index); req->type = RAFT_BARRIER; req->index = index; req->cb = cb; - rv = logAppend(&r->log, r->current_term, RAFT_BARRIER, &buf, NULL); + rv = logAppend(r->log, r->current_term, RAFT_BARRIER, &buf, NULL); if (rv != 0) { goto err_after_buf_alloc; } @@ -107,7 +107,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb) return 0; err_after_log_append: - logDiscard(&r->log, index); + logDiscard(r->log, index); QUEUE_REMOVE(&req->queue); err_after_buf_alloc: raft_free(buf.base); @@ -127,10 +127,10 @@ static int clientChangeConfiguration( (void)req; /* Index of the entry being appended. */ - index = logLastIndex(&r->log) + 1; + index = logLastIndex(r->log) + 1; /* Encode the new configuration and append it to the log. */ - rv = logAppendConfiguration(&r->log, term, configuration); + rv = logAppendConfiguration(r->log, term, configuration); if (rv != 0) { goto err; } @@ -160,7 +160,7 @@ static int clientChangeConfiguration( return 0; err_after_log_append: - logTruncate(&r->log, index); + logTruncate(r->log, index); err: assert(rv != 0); @@ -270,7 +270,7 @@ int raft_assign(struct raft *r, server_index = configurationIndexOf(&r->configuration, id); assert(server_index < r->configuration.n); - last_index = logLastIndex(&r->log); + last_index = logLastIndex(r->log); req->cb = cb; diff --git a/src/election.c b/src/election.c index abc1c82e1..0da8e67ae 100644 --- a/src/election.c +++ b/src/election.c @@ -74,8 +74,8 @@ static int electionSend(struct raft *r, const struct raft_server *server) message.type = RAFT_IO_REQUEST_VOTE; message.request_vote.term = term; message.request_vote.candidate_id = r->id; - message.request_vote.last_log_index = logLastIndex(&r->log); - message.request_vote.last_log_term = logLastTerm(&r->log); + message.request_vote.last_log_index = logLastIndex(r->log); + message.request_vote.last_log_term = logLastTerm(r->log); message.request_vote.disrupt_leader = r->candidate_state.disrupt_leader; message.request_vote.pre_vote = r->candidate_state.in_pre_vote; message.server_id = server->id; @@ -222,7 +222,7 @@ int electionVote(struct raft *r, * upon reception of the RequestVote RPC, meaning the 2 conditions will be * satisfied if the candidate's log is up-to-date. * */ - local_last_index = logLastIndex(&r->log); + local_last_index = logLastIndex(r->log); /* Our log is definitely not more up-to-date if it's empty! */ if (local_last_index == 0) { @@ -230,7 +230,7 @@ int electionVote(struct raft *r, goto grant_vote; } - local_last_term = logLastTerm(&r->log); + local_last_term = logLastTerm(r->log); if (args->last_log_term < local_last_term) { /* The requesting server has last entry's log term lower than ours. */ diff --git a/src/err.h b/src/err.h index 5a5688b88..1d2fa7e8a 100644 --- a/src/err.h +++ b/src/err.h @@ -24,7 +24,12 @@ X(RAFT_TOOBIG, "data is too big") \ X(RAFT_NOCONNECTION, "no connection to remote server available") \ X(RAFT_BUSY, "operation can't be performed at this time") \ - X(RAFT_IOERR, "I/O error") + X(RAFT_IOERR, "I/O error") \ + X(RAFT_NOTFOUND, "Resource not found") \ + X(RAFT_INVALID, "Invalid parameter") \ + X(RAFT_UNAUTHORIZED, "No access to resource") \ + X(RAFT_NOSPACE, "Not enough disk space") \ + X(RAFT_TOOMANY, "System or raft limit met or exceeded") /* Format an error message. */ #define ErrMsgPrintf(ERRMSG, ...) \ diff --git a/src/fixture.c b/src/fixture.c index ac5f34833..6ec676af4 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -30,6 +30,34 @@ * instance. This should be enough for testing purposes. */ #define MAX_PEERS 8 +struct raft_fixture_server +{ + bool alive; /* If false, the server is down. */ + raft_id id; /* Server ID. */ + char address[16]; /* Server address (stringified ID). */ + struct raft_tracer tracer; /* Tracer. */ + struct raft_io io; /* In-memory raft_io implementation. */ + struct raft raft; /* Raft instance. */ +}; + +struct raft_fixture_event +{ + unsigned server_index; /* Index of the server the event occurred on. */ + int type; /* Type of the event. */ +}; + +RAFT_API int raft_fixture_event_type(struct raft_fixture_event *event) +{ + assert(event != NULL); + return event->type; +} + +RAFT_API unsigned raft_fixture_event_server_index(struct raft_fixture_event *event) +{ + assert(event != NULL); + return event->server_index; +} + /* Fields common across all request types. */ #define REQUEST \ int type; /* Request code type. */ \ @@ -356,7 +384,7 @@ static void ioFlushSend(struct io *io, struct send *send) goto out; } - transmit = raft_malloc(sizeof *transmit); + transmit = raft_calloc(1, sizeof *transmit); assert(transmit != NULL); transmit->type = TRANSMIT; @@ -909,6 +937,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time) io->n_append = 0; raft_io->impl = io; + raft_io->version = 2; raft_io->init = ioMethodInit; raft_io->close = ioMethodClose; raft_io->start = ioMethodStart; @@ -961,7 +990,12 @@ static void emit(struct raft_tracer *t, static int serverInit(struct raft_fixture *f, unsigned i, struct raft_fsm *fsm) { int rv; - struct raft_fixture_server *s = &f->servers[i]; + struct raft_fixture_server *s; + s = raft_malloc(sizeof(*s)); + if (s == NULL) { + return RAFT_NOMEM; + } + f->servers[i] = s; s->alive = true; s->id = i + 1; sprintf(s->address, "%llu", s->id); @@ -987,6 +1021,7 @@ static void serverClose(struct raft_fixture_server *s) { raft_close(&s->raft, NULL); ioClose(&s->io); + raft_free(s); } /* Connect the server with the given index to all others */ @@ -994,8 +1029,8 @@ static void serverConnectToAll(struct raft_fixture *f, unsigned i) { unsigned j; for (j = 0; j < f->n; j++) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; if (i == j) { continue; } @@ -1003,41 +1038,19 @@ static void serverConnectToAll(struct raft_fixture *f, unsigned i) } } -int raft_fixture_init(struct raft_fixture *f, unsigned n, struct raft_fsm *fsms) +int raft_fixture_init(struct raft_fixture *f) { - unsigned i; - int rc; - assert(n >= 1); - f->time = 0; - f->n = n; - - /* Initialize all servers */ - for (i = 0; i < n; i++) { - rc = serverInit(f, i, &fsms[i]); - if (rc != 0) { - return rc; - } - } - - /* Connect all servers to each another */ - for (i = 0; i < f->n; i++) { - serverConnectToAll(f, i); + f->log = logInit(); + if (f->log == NULL) { + return RAFT_NOMEM; } - - logInit(&f->log); - f->commit_index = 0; - f->hook = NULL; - - return 0; -} - -int raft_fixture_initialize(struct raft_fixture *f) -{ - f->time = 0; - logInit(&f->log); f->commit_index = 0; f->hook = NULL; + f->event = raft_malloc(sizeof(*f->event)); + if (f->event == NULL) { + return RAFT_NOMEM; + } return 0; } @@ -1045,13 +1058,14 @@ void raft_fixture_close(struct raft_fixture *f) { unsigned i; for (i = 0; i < f->n; i++) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; ioFlushAll(io); } for (i = 0; i < f->n; i++) { - serverClose(&f->servers[i]); + serverClose(f->servers[i]); } - logClose(&f->log); + raft_free(f->event); + logClose(f->log); } int raft_fixture_configuration(struct raft_fixture *f, @@ -1067,7 +1081,7 @@ int raft_fixture_configuration(struct raft_fixture *f, struct raft_fixture_server *s; int role = i < n_voting ? RAFT_VOTER : RAFT_STANDBY; int rv; - s = &f->servers[i]; + s = f->servers[i]; rv = raft_configuration_add(configuration, s->id, s->address, role); if (rv != 0) { return rv; @@ -1096,7 +1110,7 @@ int raft_fixture_start(struct raft_fixture *f) unsigned i; int rv; for (i = 0; i < f->n; i++) { - struct raft_fixture_server *s = &f->servers[i]; + struct raft_fixture_server *s = f->servers[i]; rv = raft_start(&s->raft); if (rv != 0) { return rv; @@ -1118,13 +1132,13 @@ raft_time raft_fixture_time(struct raft_fixture *f) struct raft *raft_fixture_get(struct raft_fixture *f, unsigned i) { assert(i < f->n); - return &f->servers[i].raft; + return &f->servers[i]->raft; } bool raft_fixture_alive(struct raft_fixture *f, unsigned i) { assert(i < f->n); - return f->servers[i].alive; + return f->servers[i]->alive; } unsigned raft_fixture_leader_index(struct raft_fixture *f) @@ -1137,7 +1151,7 @@ unsigned raft_fixture_leader_index(struct raft_fixture *f) raft_id raft_fixture_voted_for(struct raft_fixture *f, unsigned i) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; return io->voted_for; } @@ -1266,7 +1280,7 @@ static void checkLeaderAppendOnly(struct raft_fixture *f) { struct raft *raft; raft_index index; - raft_index last = logLastIndex(&f->log); + raft_index last = logLastIndex(f->log); /* If the cached log is empty it means there was no leader before. */ if (last == 0) { @@ -1279,21 +1293,21 @@ static void checkLeaderAppendOnly(struct raft_fixture *f) } raft = raft_fixture_get(f, (unsigned)f->leader_id - 1); - last = logLastIndex(&f->log); + last = logLastIndex(f->log); for (index = 1; index <= last; index++) { const struct raft_entry *entry1; const struct raft_entry *entry2; size_t i; - entry1 = logGet(&f->log, index); - entry2 = logGet(&raft->log, index); + entry1 = logGet(f->log, index); + entry2 = logGet(raft->log, index); assert(entry1 != NULL); /* Check if the entry was snapshotted. */ if (entry2 == NULL) { - assert(raft->log.snapshot.last_index >= index); + assert(raft->log->snapshot.last_index >= index); continue; } @@ -1316,9 +1330,14 @@ static void copyLeaderLog(struct raft_fixture *f) unsigned n; size_t i; int rv; - logClose(&f->log); - logInit(&f->log); - rv = logAcquire(&raft->log, 1, &entries, &n); + logClose(f->log); + f->log = logInit(); + if (f->log == NULL) { + assert(false); + return; + } + + rv = logAcquire(raft->log, 1, &entries, &n); assert(rv == 0); for (i = 0; i < n; i++) { struct raft_entry *entry = &entries[i]; @@ -1327,10 +1346,10 @@ static void copyLeaderLog(struct raft_fixture *f) buf.base = raft_malloc(buf.len); assert(buf.base != NULL); memcpy(buf.base, entry->buf.base, buf.len); - rv = logAppend(&f->log, entry->term, entry->type, &buf, NULL); + rv = logAppend(f->log, entry->term, entry->type, &buf, NULL); assert(rv == 0); } - logRelease(&raft->log, 1, entries, n); + logRelease(raft->log, 1, entries, n); } /* Update the commit index to match the one from the current leader. */ @@ -1349,7 +1368,7 @@ static void getLowestTickTime(struct raft_fixture *f, raft_time *t, unsigned *i) unsigned j; *t = (raft_time)-1 /* Maximum value */; for (j = 0; j < f->n; j++) { - struct io *io = f->servers[j].io.impl; + struct io *io = f->servers[j]->io.impl; if (io->next_tick < *t) { *t = io->next_tick; *i = j; @@ -1366,7 +1385,7 @@ static void getLowestRequestCompletionTime(struct raft_fixture *f, unsigned j; *t = (raft_time)-1 /* Maximum value */; for (j = 0; j < f->n; j++) { - struct io *io = f->servers[j].io.impl; + struct io *io = f->servers[j]->io.impl; queue *head; QUEUE_FOREACH(head, &io->requests) { @@ -1382,12 +1401,12 @@ static void getLowestRequestCompletionTime(struct raft_fixture *f, /* Fire the tick callback of the i'th server. */ static void fireTick(struct raft_fixture *f, unsigned i) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; f->time = io->next_tick; - f->event.server_index = i; - f->event.type = RAFT_FIXTURE_TICK; + f->event->server_index = i; + f->event->type = RAFT_FIXTURE_TICK; io->next_tick += io->tick_interval; - if (f->servers[i].alive) { + if (f->servers[i]->alive) { io->tick_cb(io->io); } } @@ -1395,12 +1414,12 @@ static void fireTick(struct raft_fixture *f, unsigned i) /* Complete the first request with completion time @t on the @i'th server. */ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; queue *head; struct ioRequest *r = NULL; bool found = false; f->time = t; - f->event.server_index = i; + f->event->server_index = i; QUEUE_FOREACH(head, &io->requests) { r = QUEUE_DATA(head, struct ioRequest, queue); @@ -1414,27 +1433,27 @@ static void completeRequest(struct raft_fixture *f, unsigned i, raft_time t) switch (r->type) { case APPEND: ioFlushAppend(io, (struct append *)r); - f->event.type = RAFT_FIXTURE_DISK; + f->event->type = RAFT_FIXTURE_DISK; break; case SEND: ioFlushSend(io, (struct send *)r); - f->event.type = RAFT_FIXTURE_NETWORK; + f->event->type = RAFT_FIXTURE_NETWORK; break; case TRANSMIT: ioDeliverTransmit(io, (struct transmit *)r); - f->event.type = RAFT_FIXTURE_NETWORK; + f->event->type = RAFT_FIXTURE_NETWORK; break; case SNAPSHOT_PUT: ioFlushSnapshotPut(io, (struct snapshot_put *)r); - f->event.type = RAFT_FIXTURE_DISK; + f->event->type = RAFT_FIXTURE_DISK; break; case SNAPSHOT_GET: ioFlushSnapshotGet(io, (struct snapshot_get *)r); - f->event.type = RAFT_FIXTURE_DISK; + f->event->type = RAFT_FIXTURE_DISK; break; case ASYNC_WORK: ioFlushAsyncWork(io, (struct async_work *)r); - f->event.type = RAFT_FIXTURE_WORK; + f->event->type = RAFT_FIXTURE_WORK; break; default: assert(0); @@ -1473,10 +1492,10 @@ struct raft_fixture_event *raft_fixture_step(struct raft_fixture *f) } if (f->hook != NULL) { - f->hook(f, &f->event); + f->hook(f, f->event); } - return &f->event; + return f->event; } struct raft_fixture_event *raft_fixture_step_n(struct raft_fixture *f, @@ -1549,7 +1568,7 @@ static void dropAllExcept(struct raft_fixture *f, { unsigned j; for (j = 0; j < f->n; j++) { - struct raft_fixture_server *s = &f->servers[j]; + struct raft_fixture_server *s = f->servers[j]; if (j == i) { continue; } @@ -1562,7 +1581,7 @@ static void dropAllExcept(struct raft_fixture *f, static void minimizeRandomizedElectionTimeout(struct raft_fixture *f, unsigned i) { - struct raft *raft = &f->servers[i].raft; + struct raft *raft = &f->servers[i]->raft; raft_time now = raft->io->time(raft->io); unsigned timeout = raft->election_timeout; assert(raft->state == RAFT_FOLLOWER); @@ -1583,7 +1602,7 @@ static void maximizeAllRandomizedElectionTimeoutsExcept(struct raft_fixture *f, { unsigned j; for (j = 0; j < f->n; j++) { - struct raft *raft = &f->servers[j].raft; + struct raft *raft = &f->servers[j]->raft; unsigned timeout = raft->election_timeout * 2; if (j == i) { continue; @@ -1612,7 +1631,7 @@ void raft_fixture_elect(struct raft_fixture *f, unsigned i) /* Make sure all servers are currently followers. */ for (j = 0; j < f->n; j++) { - assert(raft_state(&f->servers[j].raft) == RAFT_FOLLOWER); + assert(raft_state(&f->servers[j]->raft) == RAFT_FOLLOWER); } /* Pretend that the last randomized election timeout was set at the maximum @@ -1632,7 +1651,7 @@ void raft_fixture_depose(struct raft_fixture *f) /* Make sure there's a leader. */ assert(f->leader_id != 0); leader_i = (unsigned)f->leader_id - 1; - assert(raft_state(&f->servers[leader_i].raft) == RAFT_LEADER); + assert(raft_state(&f->servers[leader_i]->raft) == RAFT_LEADER); /* Set a very large election timeout on all followers, to prevent them from * starting an election. */ @@ -1799,22 +1818,22 @@ bool raft_fixture_step_until_delivered(struct raft_fixture *f, void raft_fixture_disconnect(struct raft_fixture *f, unsigned i, unsigned j) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; ioDisconnect(io1, io2); } void raft_fixture_reconnect(struct raft_fixture *f, unsigned i, unsigned j) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; ioReconnect(io1, io2); } void raft_fixture_saturate(struct raft_fixture *f, unsigned i, unsigned j) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; ioSaturate(io1, io2); } @@ -1838,7 +1857,7 @@ static void reconnectToAll(struct raft_fixture *f, unsigned i) continue; } /* Don't reconnect to disconnected peers */ - if (!f->servers[j].alive) { + if (!f->servers[j]->alive) { continue; } raft_fixture_desaturate(f, i, j); @@ -1848,28 +1867,28 @@ static void reconnectToAll(struct raft_fixture *f, unsigned i) bool raft_fixture_saturated(struct raft_fixture *f, unsigned i, unsigned j) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; return ioSaturated(io1, io2); } void raft_fixture_desaturate(struct raft_fixture *f, unsigned i, unsigned j) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; ioDesaturate(io1, io2); } void raft_fixture_kill(struct raft_fixture *f, unsigned i) { disconnectFromAll(f, i); - f->servers[i].alive = false; + f->servers[i]->alive = false; } void raft_fixture_revive(struct raft_fixture *f, unsigned i) { reconnectToAll(f, i); - f->servers[i].alive = true; + f->servers[i]->alive = true; } int raft_fixture_grow(struct raft_fixture *f, struct raft_fsm *fsm) @@ -1887,8 +1906,8 @@ int raft_fixture_grow(struct raft_fixture *f, struct raft_fsm *fsm) serverConnectToAll(f, i); for (j = 0; j < f->n; j++) { - struct raft_io *io1 = &f->servers[i].io; - struct raft_io *io2 = &f->servers[j].io; + struct raft_io *io1 = &f->servers[i]->io; + struct raft_io *io2 = &f->servers[j]->io; ioConnect(io2, io1); } @@ -1899,7 +1918,7 @@ void raft_fixture_set_randomized_election_timeout(struct raft_fixture *f, unsigned i, unsigned msecs) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; io->randomized_election_timeout = msecs; } @@ -1907,7 +1926,7 @@ void raft_fixture_set_network_latency(struct raft_fixture *f, unsigned i, unsigned msecs) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; io->network_latency = msecs; } @@ -1915,13 +1934,13 @@ void raft_fixture_set_disk_latency(struct raft_fixture *f, unsigned i, unsigned msecs) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; io->disk_latency = msecs; } void raft_fixture_set_term(struct raft_fixture *f, unsigned i, raft_term term) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; io->term = term; } @@ -1929,7 +1948,7 @@ void raft_fixture_set_snapshot(struct raft_fixture *f, unsigned i, struct raft_snapshot *snapshot) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; io->snapshot = snapshot; } @@ -1937,7 +1956,7 @@ void raft_fixture_add_entry(struct raft_fixture *f, unsigned i, struct raft_entry *entry) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; struct raft_entry *entries; entries = raft_realloc(io->entries, (io->n + 1) * sizeof *entries); assert(entries != NULL); @@ -1951,20 +1970,20 @@ void raft_fixture_io_fault(struct raft_fixture *f, int delay, int repeat) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; io->fault.countdown = delay; io->fault.n = repeat; } unsigned raft_fixture_n_send(struct raft_fixture *f, unsigned i, int type) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; return io->n_send[type]; } unsigned raft_fixture_n_recv(struct raft_fixture *f, unsigned i, int type) { - struct io *io = f->servers[i].io.impl; + struct io *io = f->servers[i]->io.impl; return io->n_recv[type]; } diff --git a/src/log.c b/src/log.c index d3ce9878d..919e7472d 100644 --- a/src/log.c +++ b/src/log.c @@ -335,17 +335,26 @@ static bool refsDecr(struct raft_log *l, return true; } -void logInit(struct raft_log *l) +struct raft_log* logInit(void) { - assert(l != NULL); - l->entries = NULL; - l->size = 0; - l->front = l->back = 0; - l->offset = 0; - l->refs = NULL; - l->refs_size = 0; - l->snapshot.last_index = 0; - l->snapshot.last_term = 0; + struct raft_log *log; + + log = raft_malloc(sizeof(*log)); + if (log == NULL) { + return NULL; + } + + + log->entries = NULL; + log->size = 0; + log->front = log->back = 0; + log->offset = 0; + log->refs = NULL; + log->refs_size = 0; + log->snapshot.last_index = 0; + log->snapshot.last_term = 0; + + return log; } /* Return the index of the i'th entry in the log. */ @@ -411,6 +420,8 @@ void logClose(struct raft_log *l) if (l->refs != NULL) { raft_free(l->refs); } + + raft_free(l); } void logStart(struct raft_log *l, diff --git a/src/log.h b/src/log.h index 320302106..ebe008b40 100644 --- a/src/log.h +++ b/src/log.h @@ -8,8 +8,53 @@ /* Initial size of the entry reference count hash table. */ #define LOG__REFS_INITIAL_SIZE 256 + +/** + * Counter for outstanding references to a log entry. + * + * When an entry is first appended to the log, its refcount is set to one (the + * log itself is the only one referencing the entry). Whenever an entry is + * included in an I/O request (to write it to disk or to send it to other + * servers) its refcount is increased by one. Whenever an entry gets deleted + * from the log its refcount is decreased by one. Likewise, whenever an I/O + * request is completed the refcount of the relevant entries is decreased by + * one. When the refcount drops to zero the memory that its @buf attribute + * points to gets released, or, if the @batch attribute is non-NULL, a check is + * made to see if all other entries of the same batch also have a zero refcount, + * and the memory that @batch points to gets released if that's the case. + */ +struct raft_entry_ref +{ + raft_term term; /* Term of the entry being ref-counted. */ + raft_index index; /* Index of the entry being ref-counted. */ + unsigned short count; /* Number of references. */ + struct raft_entry_ref *next; /* Next item in the bucket (for collisions). */ +}; + +/** + * In-memory cache of the persistent raft log stored on disk. + * + * The raft log cache is implemented as a circular buffer of log entries, which + * makes some frequent operations very efficient (e.g. deleting the first N + * entries when snapshotting). + */ +struct raft_log +{ + struct raft_entry *entries; /* Circular buffer of log entries. */ + size_t size; /* Number of available slots in the buffer. */ + size_t front, back; /* Indexes of used slots [front, back). */ + raft_index offset; /* Index of first entry is offset+1. */ + struct raft_entry_ref *refs; /* Log entries reference counts hash table. */ + size_t refs_size; /* Size of the reference counts hash table. */ + struct /* Information about last snapshot, or zero. */ + { + raft_index last_index; /* Snapshot replaces all entries up to here. */ + raft_term last_term; /* Term of last index. */ + } snapshot; +}; + /* Initialize an empty in-memory log of raft entries. */ -void logInit(struct raft_log *l); +struct raft_log* logInit(void); /* Release all memory used by the given log object. */ void logClose(struct raft_log *l); diff --git a/src/membership.c b/src/membership.c index d7449ae05..5e1ad6936 100644 --- a/src/membership.c +++ b/src/membership.c @@ -38,7 +38,7 @@ int membershipCanChangeConfiguration(struct raft *r) /* The index of the last committed configuration can't be greater than the * last log index. */ - assert(logLastIndex(&r->log) >= r->configuration_index); + assert(logLastIndex(r->log) >= r->configuration_index); /* No catch-up round should be in progress. */ assert(r->leader_state.round_number == 0); @@ -80,7 +80,7 @@ bool membershipUpdateCatchUpRound(struct raft *r) return false; } - last_index = logLastIndex(&r->log); + last_index = logLastIndex(r->log); round_duration = now - r->leader_state.round_start; is_up_to_date = match_index == last_index; @@ -155,7 +155,7 @@ int membershipRollback(struct raft *r) /* Fetch the last committed configuration entry. */ assert(r->configuration_index != 0); - entry = logGet(&r->log, r->configuration_index); + entry = logGet(r->log, r->configuration_index); assert(entry != NULL); @@ -204,8 +204,8 @@ int membershipLeadershipTransferStart(struct raft *r) message.server_id = server->id; message.server_address = server->address; message.timeout_now.term = r->current_term; - message.timeout_now.last_log_index = logLastIndex(&r->log); - message.timeout_now.last_log_term = logLastTerm(&r->log); + message.timeout_now.last_log_index = logLastIndex(r->log); + message.timeout_now.last_log_term = logLastTerm(r->log); r->transfer->send.data = r; rv = r->io->send(r->io, &r->transfer->send, &message, NULL); if (rv != 0) { diff --git a/src/progress.c b/src/progress.c index 2d5969b88..95bcb6e94 100644 --- a/src/progress.c +++ b/src/progress.c @@ -31,7 +31,7 @@ int progressBuildArray(struct raft *r) { struct raft_progress *progress; unsigned i; - raft_index last_index = logLastIndex(&r->log); + raft_index last_index = logLastIndex(r->log); progress = raft_malloc(r->configuration.n * sizeof *progress); if (progress == NULL) { return RAFT_NOMEM; @@ -49,7 +49,7 @@ int progressBuildArray(struct raft *r) int progressRebuildArray(struct raft *r, const struct raft_configuration *configuration) { - raft_index last_index = logLastIndex(&r->log); + raft_index last_index = logLastIndex(r->log); struct raft_progress *progress; unsigned i; unsigned j; @@ -97,7 +97,7 @@ int progressRebuildArray(struct raft *r, bool progressIsUpToDate(struct raft *r, unsigned i) { struct raft_progress *p = &r->leader_state.progress[i]; - raft_index last_index = logLastIndex(&r->log); + raft_index last_index = logLastIndex(r->log); return p->next_index == last_index + 1; } @@ -106,7 +106,7 @@ bool progressShouldReplicate(struct raft *r, unsigned i) struct raft_progress *p = &r->leader_state.progress[i]; raft_time now = r->io->time(r->io); bool needs_heartbeat = now - p->last_send >= r->heartbeat_timeout; - raft_index last_index = logLastIndex(&r->log); + raft_index last_index = logLastIndex(r->log); bool result = false; /* We must be in a valid state. */ @@ -183,7 +183,7 @@ void progressToSnapshot(struct raft *r, unsigned i) { struct raft_progress *p = &r->leader_state.progress[i]; p->state = PROGRESS__SNAPSHOT; - p->snapshot_index = logSnapshotIndex(&r->log); + p->snapshot_index = logSnapshotIndex(r->log); } void progressAbortSnapshot(struct raft *r, const unsigned i) diff --git a/src/progress.h b/src/progress.h index 311c55db1..229a5a353 100644 --- a/src/progress.h +++ b/src/progress.h @@ -12,6 +12,20 @@ enum { PROGRESS__SNAPSHOT /* Sending a snapshot */ }; +/** + * Used by leaders to keep track of replication progress for each server. + */ +struct raft_progress +{ + unsigned short state; /* Probe, pipeline or snapshot. */ + raft_index next_index; /* Next entry to send. */ + raft_index match_index; /* Highest index reported as replicated. */ + raft_index snapshot_index; /* Last index of most recent snapshot sent. */ + raft_time last_send; /* Timestamp of last AppendEntries RPC. */ + raft_time snapshot_last_send; /* Timestamp of last InstallSnaphot RPC. */ + bool recent_recv; /* A msg was received within election timeout. */ +}; + /* Create and initialize the array of progress objects used by the leader to * * track followers. The match index will be set to zero, and the next index to * the current last index plus 1. */ diff --git a/src/raft.c b/src/raft.c index 071b822f2..282184d13 100644 --- a/src/raft.c +++ b/src/raft.c @@ -24,9 +24,14 @@ #define DEFAULT_MAX_CATCH_UP_ROUNDS 10 #define DEFAULT_MAX_CATCH_UP_ROUND_DURATION (5 * 1000) -static int ioFsmCompat(struct raft *r, - struct raft_io *io, - struct raft_fsm *fsm); +int raft_version_number (void) +{ + return RAFT_VERSION_NUMBER; +} + +static int ioFsmVersionCheck(struct raft *r, + struct raft_io *io, + struct raft_fsm *fsm); int raft_init(struct raft *r, struct raft_io *io, @@ -37,7 +42,7 @@ int raft_init(struct raft *r, int rv; assert(r != NULL); - rv = ioFsmCompat(r, io, fsm); + rv = ioFsmVersionCheck(r, io, fsm); if (rv != 0) { goto err; } @@ -59,7 +64,12 @@ int raft_init(struct raft *r, strcpy(r->address, address); r->current_term = 0; r->voted_for = 0; - logInit(&r->log); + r->log = logInit(); + if (r->log == NULL) { + rv = RAFT_NOMEM; + goto err_after_address_alloc; + } + raft_configuration_init(&r->configuration); r->configuration_index = 0; r->configuration_uncommitted_index = 0; @@ -98,7 +108,7 @@ static void ioCloseCb(struct raft_io *io) { struct raft *r = io->data; raft_free(r->address); - logClose(&r->log); + logClose(r->log); raft_configuration_close(&r->configuration); if (r->close_cb != NULL) { r->close_cb(r); @@ -238,10 +248,20 @@ unsigned long long raft_digest(const char *text, unsigned long long n) return byteFlip64(digest); } -static int ioFsmCompat(struct raft *r, - struct raft_io *io, - struct raft_fsm *fsm) +static int ioFsmVersionCheck(struct raft *r, + struct raft_io *io, + struct raft_fsm *fsm) { + if (io->version == 0) { + ErrMsgPrintf(r->errmsg, "io->version must be set"); + return -1; + } + + if (fsm->version == 0) { + ErrMsgPrintf(r->errmsg, "fsm->version must be set"); + return -1; + } + if ((fsm->version > 2 && fsm->snapshot_async != NULL) && ((io->version < 2) || (io->async_work == NULL))) { ErrMsgPrintf(r->errmsg, diff --git a/src/recv.c b/src/recv.c index ea8bd2f0d..fd8b4ded6 100644 --- a/src/recv.c +++ b/src/recv.c @@ -22,15 +22,6 @@ static int recvMessage(struct raft *r, struct raft_message *message) { int rv = 0; - if (message->type < RAFT_IO_APPEND_ENTRIES || - message->type > RAFT_IO_TIMEOUT_NOW) { - tracef("received unknown message type type: %d", message->type); - return 0; - } - - /* tracef("%s from server %ld", message_descs[message->type - 1], - message->server_id); */ - switch (message->type) { case RAFT_IO_APPEND_ENTRIES: rv = recvAppendEntries(r, message->server_id, @@ -70,6 +61,10 @@ static int recvMessage(struct raft *r, struct raft_message *message) rv = recvTimeoutNow(r, message->server_id, message->server_address, &message->timeout_now); break; + default: + tracef("received unknown message type (%d)", message->type); + /* Drop message */ + return 0; }; if (rv != 0 && rv != RAFT_NOCONNECTION) { diff --git a/src/recv_append_entries.c b/src/recv_append_entries.c index 83a2da62e..6244e9432 100644 --- a/src/recv_append_entries.c +++ b/src/recv_append_entries.c @@ -37,7 +37,8 @@ int recvAppendEntries(struct raft *r, r->id, id, address, args->leader_commit, args->n_entries, args->prev_log_index, args->prev_log_term, args->term); result->rejected = args->prev_log_index; - result->last_log_index = logLastIndex(&r->log); + result->last_log_index = logLastIndex(r->log); + result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION; rv = recvEnsureMatchingTerms(r, args->term, &match); if (rv != 0) { diff --git a/src/recv_install_snapshot.c b/src/recv_install_snapshot.c index 41df8a695..15836ba53 100644 --- a/src/recv_install_snapshot.c +++ b/src/recv_install_snapshot.c @@ -32,7 +32,8 @@ int recvInstallSnapshot(struct raft *r, r->id, id, address, args->conf_index, args->last_index, args->last_term, args->term); result->rejected = args->last_index; - result->last_log_index = logLastIndex(&r->log); + result->last_log_index = logLastIndex(r->log); + result->version = RAFT_APPEND_ENTRIES_RESULT_VERSION; rv = recvEnsureMatchingTerms(r, args->term, &match); if (rv != 0) { diff --git a/src/recv_request_vote.c b/src/recv_request_vote.c index f062abee0..6488ee631 100644 --- a/src/recv_request_vote.c +++ b/src/recv_request_vote.c @@ -34,7 +34,8 @@ int recvRequestVote(struct raft *r, r->id, id, address, args->candidate_id, args->disrupt_leader, args->last_log_index, args->last_log_term, args->pre_vote, args->term); result->vote_granted = false; - result->pre_vote = TO_RAFT_TRIBOOL(args->pre_vote); + result->pre_vote = args->pre_vote; + result->version = RAFT_REQUEST_VOTE_RESULT_VERSION; /* Reject the request if we have a leader. * diff --git a/src/recv_request_vote_result.c b/src/recv_request_vote_result.c index dc0a83c87..ec81b0170 100644 --- a/src/recv_request_vote_result.c +++ b/src/recv_request_vote_result.c @@ -24,8 +24,9 @@ int recvRequestVoteResult(struct raft *r, assert(r != NULL); assert(id > 0); - tracef("self:%llu from:%llu@%s term:%llu vote_granted:%d pre_vote:%d", - r->id, id, address, result->term, result->vote_granted, result->pre_vote); + tracef("self:%llu from:%llu@%s term:%llu vote_granted:%d pre_vote:%d version:%d", + r->id, id, address, result->term, result->vote_granted, result->pre_vote, + result->version); votes_index = configurationIndexOfVoter(&r->configuration, id); if (votes_index == r->configuration.n) { tracef("non-voting or unknown server -> reject"); @@ -66,7 +67,7 @@ int recvRequestVoteResult(struct raft *r, } /* Avoid counting pre-vote votes as regular votes. */ - if (!r->candidate_state.in_pre_vote && result->pre_vote == raft_tribool_true) { + if (result->version > 1 && result->pre_vote && !r->candidate_state.in_pre_vote) { tracef("receive stale pre-vote response -> ignore"); return 0; } @@ -75,7 +76,7 @@ int recvRequestVoteResult(struct raft *r, * sends real RequestVote RPCs, crashes, comes online, starts a pre-vote * and then receives the response to the RequestVote RPC it sent * out before crashing. */ - if (r->candidate_state.in_pre_vote && result->pre_vote == raft_tribool_false) { + if (result->version > 1 && !result->pre_vote && r->candidate_state.in_pre_vote) { tracef("receive vote response during pre-vote -> ignore"); return 0; } diff --git a/src/recv_timeout_now.c b/src/recv_timeout_now.c index 80f8b398c..211a33eba 100644 --- a/src/recv_timeout_now.c +++ b/src/recv_timeout_now.c @@ -53,8 +53,8 @@ int recvTimeoutNow(struct raft *r, } /* Ignore the request if we our log is not up-to-date. */ - local_last_index = logLastIndex(&r->log); - local_last_term = logLastTerm(&r->log); + local_last_index = logLastIndex(r->log); + local_last_term = logLastTerm(r->log); if (local_last_index != args->last_log_index || local_last_term != args->last_log_term) { return 0; diff --git a/src/replication.c b/src/replication.c index 4b80d4f18..d048e88fa 100644 --- a/src/replication.c +++ b/src/replication.c @@ -58,7 +58,7 @@ static void sendAppendEntriesCb(struct raft_io_send *send, const int status) } /* Tell the log that we're done referencing these entries. */ - logRelease(&r->log, req->index, req->entries, req->n); + logRelease(r->log, req->index, req->entries, req->n); raft_free(req); } @@ -81,7 +81,7 @@ static int sendAppendEntries(struct raft *r, args->prev_log_term = prev_term; /* TODO: implement a limit to the total size of the entries being sent */ - rv = logAcquire(&r->log, next_index, &args->entries, &args->n_entries); + rv = logAcquire(r->log, next_index, &args->entries, &args->n_entries); if (rv != 0) { goto err; } @@ -98,7 +98,7 @@ static int sendAppendEntries(struct raft *r, tracef("send %u entries starting at %llu to server %llu (last index %llu)", args->n_entries, args->prev_log_index, server->id, - logLastIndex(&r->log)); + logLastIndex(r->log)); message.type = RAFT_IO_APPEND_ENTRIES; message.server_id = server->id; @@ -132,7 +132,7 @@ static int sendAppendEntries(struct raft *r, err_after_req_alloc: raft_free(req); err_after_entries_acquired: - logRelease(&r->log, next_index, args->entries, args->n_entries); + logRelease(r->log, next_index, args->entries, args->n_entries); err: assert(rv != 0); return rv; @@ -287,7 +287,7 @@ int replicationProgress(struct raft *r, unsigned i) { struct raft_server *server = &r->configuration.servers[i]; bool progress_state_is_snapshot = progressState(r, i) == PROGRESS__SNAPSHOT; - raft_index snapshot_index = logSnapshotIndex(&r->log); + raft_index snapshot_index = logSnapshotIndex(r->log); raft_index next_index = progressNextIndex(r, i); raft_index prev_index; raft_term prev_term; @@ -318,7 +318,7 @@ int replicationProgress(struct raft *r, unsigned i) * null. If the first entry is not available anymore, send the last * snapshot if we're not already sending one. */ if (snapshot_index > 0 && !progress_state_is_snapshot) { - raft_index last_index = logLastIndex(&r->log); + raft_index last_index = logLastIndex(r->log); assert(last_index > 0); /* The log can't be empty */ goto send_snapshot; } @@ -328,7 +328,7 @@ int replicationProgress(struct raft *r, unsigned i) /* Set prevIndex and prevTerm to the index and term of the entry at * next_index - 1. */ prev_index = next_index - 1; - prev_term = logTermOf(&r->log, prev_index); + prev_term = logTermOf(r->log, prev_index); /* If the entry is not anymore in our log, send the last snapshot if we're * not doing so already. */ if (prev_term == 0 && !progress_state_is_snapshot) { @@ -340,8 +340,8 @@ int replicationProgress(struct raft *r, unsigned i) /* Send empty AppendEntries RPC when installing a snaphot */ if (progress_state_is_snapshot) { - prev_index = logLastIndex(&r->log); - prev_term = logLastTerm(&r->log); + prev_index = logLastIndex(r->log); + prev_term = logLastTerm(r->log); } return sendAppendEntries(r, i, prev_index, prev_term); @@ -352,8 +352,8 @@ int replicationProgress(struct raft *r, unsigned i) return sendSnapshot(r, i); } else { /* Send empty AppendEntries RPC when we haven't heard from the server */ - prev_index = logLastIndex(&r->log); - prev_term = logLastTerm(&r->log); + prev_index = logLastIndex(r->log); + prev_term = logLastTerm(r->log); return sendAppendEntries(r, i, prev_index, prev_term); } } @@ -421,7 +421,7 @@ static size_t updateLastStored(struct raft *r, for (i = 0; i < n_entries; i++) { struct raft_entry *entry = &entries[i]; raft_index index = first_index + i; - raft_term local_term = logTermOf(&r->log, index); + raft_term local_term = logTermOf(r->log, index); /* If we have no entry at this index, or if the entry we have now has a * different term, it means that this entry got truncated, so let's stop @@ -527,11 +527,11 @@ static void appendLeaderCb(struct raft_io_append *req, int status) out: /* Tell the log that we're done referencing these entries. */ - logRelease(&r->log, request->index, request->entries, request->n); + logRelease(r->log, request->index, request->entries, request->n); index = request->index; raft_free(request); if (status != 0) { - logTruncate(&r->log, index); + logTruncate(r->log, index); convertToFollower(r); } } @@ -549,7 +549,7 @@ static int appendLeader(struct raft *r, raft_index index) assert(index > r->last_stored); /* Acquire all the entries from the given index onwards. */ - rv = logAcquire(&r->log, index, &entries, &n); + rv = logAcquire(r->log, index, &entries, &n); if (rv != 0) { goto err; } @@ -588,7 +588,7 @@ static int appendLeader(struct raft *r, raft_index index) err_after_request_alloc: raft_free(request); err_after_entries_acquired: - logRelease(&r->log, index, entries, n); + logRelease(r->log, index, entries, n); err: assert(rv != 0); return rv; @@ -637,10 +637,10 @@ static int triggerActualPromotion(struct raft *r) server->role = RAFT_VOTER; /* Index of the entry being appended. */ - index = logLastIndex(&r->log) + 1; + index = logLastIndex(r->log) + 1; /* Encode the new configuration and append it to the log. */ - rv = logAppendConfiguration(&r->log, term, &r->configuration); + rv = logAppendConfiguration(r->log, term, &r->configuration); if (rv != 0) { goto err; } @@ -652,12 +652,12 @@ static int triggerActualPromotion(struct raft *r) } r->leader_state.promotee_id = 0; - r->configuration_uncommitted_index = logLastIndex(&r->log); + r->configuration_uncommitted_index = logLastIndex(r->log); return 0; err_after_log_append: - logTruncate(&r->log, index); + logTruncate(r->log, index); err: server->role = old_role; @@ -707,8 +707,8 @@ int replicationUpdate(struct raft *r, * value of prevLogIndex + len(entriesToAppend). If it has a longer log, it * might be a leftover from previous terms. */ last_index = result->last_log_index; - if (last_index > logLastIndex(&r->log)) { - last_index = logLastIndex(&r->log); + if (last_index > logLastIndex(r->log)) { + last_index = logLastIndex(r->log); } /* If the RPC succeeded, update our counters for this server. @@ -879,7 +879,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status) for (j = 0; j < i; j++) { struct raft_entry *entry = &args->entries[j]; raft_index index = request->index + j; - raft_term local_term = logTermOf(&r->log, index); + raft_term local_term = logTermOf(r->log, index); assert(local_term != 0 && local_term == entry->term); @@ -917,7 +917,7 @@ static void appendFollowerCb(struct raft_io_append *req, int status) sendAppendEntriesResult(r, &result); out: - logRelease(&r->log, request->index, request->args.entries, + logRelease(r->log, request->index, request->args.entries, request->args.n_entries); raft_free(request); @@ -947,7 +947,7 @@ static int checkLogMatchingProperty(struct raft *r, return 0; } - local_prev_term = logTermOf(&r->log, args->prev_log_index); + local_prev_term = logTermOf(r->log, args->prev_log_index); if (local_prev_term == 0) { tracef("no entry at index %llu -> reject", args->prev_log_index); return 1; @@ -993,7 +993,7 @@ static int deleteConflictingEntries(struct raft *r, for (j = 0; j < args->n_entries; j++) { struct raft_entry *entry = &args->entries[j]; raft_index entry_index = args->prev_log_index + 1 + j; - raft_term local_term = logTermOf(&r->log, entry_index); + raft_term local_term = logTermOf(r->log, entry_index); if (local_term > 0 && local_term != entry->term) { if (entry_index <= r->commit_index) { @@ -1018,7 +1018,7 @@ static int deleteConflictingEntries(struct raft *r, if (rv != 0) { return rv; } - logTruncate(&r->log, entry_index); + logTruncate(r->log, entry_index); /* Drop information about previously stored entries that have just * been discarded. */ @@ -1133,14 +1133,14 @@ int replicationAppend(struct raft *r, goto err_after_request_alloc; } - rv = logAppend(&r->log, copy.term, copy.type, ©.buf, NULL); + rv = logAppend(r->log, copy.term, copy.type, ©.buf, NULL); if (rv != 0) { goto err_after_request_alloc; } } /* Acquire the relevant entries from the log. */ - rv = logAcquire(&r->log, request->index, &request->args.entries, + rv = logAcquire(r->log, request->index, &request->args.entries, &request->args.n_entries); if (rv != 0) { goto err_after_request_alloc; @@ -1167,7 +1167,7 @@ int replicationAppend(struct raft *r, err_after_acquire_entries: /* Release the entries related to the IO request */ - logRelease(&r->log, request->index, request->args.entries, + logRelease(r->log, request->index, request->args.entries, request->args.n_entries); err_after_request_alloc: @@ -1176,7 +1176,7 @@ int replicationAppend(struct raft *r, * to future log entries not being persisted to disk. */ if (j != 0) { - logTruncate(&r->log, request->index); + logTruncate(r->log, request->index); } raft_free(request); @@ -1276,14 +1276,14 @@ int replicationInstallSnapshot(struct raft *r, } /* If our last snapshot is more up-to-date, this is a no-op */ - if (r->log.snapshot.last_index >= args->last_index) { + if (r->log->snapshot.last_index >= args->last_index) { tracef("have more recent snapshot"); *rejected = 0; return 0; } /* If we already have all entries in the snapshot, this is a no-op */ - local_term = logTermOf(&r->log, args->last_index); + local_term = logTermOf(r->log, args->last_index); if (local_term != 0 && local_term >= args->last_term) { tracef("have all entries"); *rejected = 0; @@ -1293,7 +1293,7 @@ int replicationInstallSnapshot(struct raft *r, *async = true; /* Preemptively update our in-memory state. */ - logRestore(&r->log, args->last_index, args->last_term); + logRestore(r->log, args->last_index, args->last_term); r->last_stored = 0; @@ -1425,7 +1425,7 @@ static bool shouldTakeSnapshot(struct raft *r) }; /* If we didn't reach the threshold yet, do nothing. */ - if (r->last_applied - r->log.snapshot.last_index < r->snapshot.threshold) { + if (r->last_applied - r->log->snapshot.last_index < r->snapshot.threshold) { return false; } @@ -1462,7 +1462,7 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *req, int status) goto out; } - logSnapshot(&r->log, snapshot->index, r->snapshot.trailing); + logSnapshot(r->log, snapshot->index, r->snapshot.trailing); out: takeSnapshotClose(r, snapshot); r->snapshot.pending.term = 0; @@ -1524,7 +1524,7 @@ static int takeSnapshot(struct raft *r) snapshot = &r->snapshot.pending; snapshot->index = r->last_applied; - snapshot->term = logTermOf(&r->log, r->last_applied); + snapshot->term = logTermOf(r->log, r->last_applied); snapshot->bufs = NULL; snapshot->n_bufs = 0; @@ -1587,7 +1587,7 @@ int replicationApply(struct raft *r) } for (index = r->last_applied + 1; index <= r->commit_index; index++) { - const struct raft_entry *entry = logGet(&r->log, index); + const struct raft_entry *entry = logGet(r->log, index); if (entry == NULL) { /* This can happen while installing a snapshot */ tracef("replicationApply - ENTRY NULL"); @@ -1641,11 +1641,11 @@ void replicationQuorum(struct raft *r, const raft_index index) /* TODO: fuzzy-test --seed 0x8db5fccc replication/entries/partitioned * fails the assertion below. */ - if (logTermOf(&r->log, index) == 0) { + if (logTermOf(r->log, index) == 0) { return; } - // assert(logTermOf(&r->log, index) > 0); - assert(logTermOf(&r->log, index) <= r->current_term); + // assert(logTermOf(r->log, index) > 0); + assert(logTermOf(r->log, index) <= r->current_term); for (i = 0; i < r->configuration.n; i++) { struct raft_server *server = &r->configuration.servers[i]; diff --git a/src/request.h b/src/request.h index 1633dd89b..a22baa56c 100644 --- a/src/request.h +++ b/src/request.h @@ -11,6 +11,10 @@ struct request int type; raft_index index; void *queue[2]; + uint8_t req_id[16]; + uint8_t client_id[16]; + uint8_t unique_id[16]; + uint64_t reserved[4]; }; #endif /* REQUEST_H_ */ diff --git a/src/start.c b/src/start.c index 78b71f8e9..9c04d3c35 100644 --- a/src/start.c +++ b/src/start.c @@ -68,11 +68,11 @@ static int restoreEntries(struct raft *r, raft_index conf_index = 0; size_t i; int rv; - logStart(&r->log, snapshot_index, snapshot_term, start_index); + logStart(r->log, snapshot_index, snapshot_term, start_index); r->last_stored = start_index - 1; for (i = 0; i < n; i++) { struct raft_entry *entry = &entries[i]; - rv = logAppend(&r->log, entry->term, entry->type, &entry->buf, + rv = logAppend(r->log, entry->term, entry->type, &entry->buf, entry->batch); if (rv != 0) { goto err; @@ -93,8 +93,8 @@ static int restoreEntries(struct raft *r, return 0; err: - if (logNumEntries(&r->log) > 0) { - logDiscard(&r->log, r->log.offset + 1); + if (logNumEntries(r->log) > 0) { + logDiscard(r->log, r->log->offset + 1); } return rv; } @@ -136,8 +136,8 @@ int raft_start(struct raft *r) assert(r->heartbeat_timeout != 0); assert(r->heartbeat_timeout < r->election_timeout); assert(r->install_snapshot_timeout != 0); - assert(logNumEntries(&r->log) == 0); - assert(logSnapshotIndex(&r->log) == 0); + assert(logNumEntries(r->log) == 0); + assert(logSnapshotIndex(r->log) == 0); assert(r->last_stored == 0); tracef("starting"); diff --git a/src/state.c b/src/state.c index 70839e1cb..f7945edc8 100644 --- a/src/state.c +++ b/src/state.c @@ -35,7 +35,7 @@ void raft_leader(struct raft *r, raft_id *id, const char **address) raft_index raft_last_index(struct raft *r) { - return logLastIndex(&r->log); + return logLastIndex(r->log); } raft_index raft_last_applied(struct raft *r) diff --git a/src/uv.c b/src/uv.c index b015d8dc8..4c938175d 100644 --- a/src/uv.c +++ b/src/uv.c @@ -642,6 +642,11 @@ int raft_uv_init(struct raft_io *io, memset(io, 0, sizeof *io); io->data = data; + if (transport->version == 0) { + ErrMsgPrintf(io->errmsg, "transport->version must be set"); + return RAFT_INVALID; + } + /* Ensure that the given path doesn't exceed our static buffer limit. */ if (!UV__DIR_HAS_VALID_LEN(dir)) { ErrMsgPrintf(io->errmsg, "directory path too long"); @@ -704,7 +709,7 @@ int raft_uv_init(struct raft_io *io, uvSeedRand(uv); /* Set the raft_io implementation. */ - io->version = 1; /* future-proof'ing */ + io->version = 2; /* future-proof'ing */ io->impl = uv; io->init = uvInit; io->close = uvClose; diff --git a/src/uv_encoding.c b/src/uv_encoding.c index 0d89bf514..d3201c0f6 100644 --- a/src/uv_encoding.c +++ b/src/uv_encoding.c @@ -1,5 +1,6 @@ #include "uv_encoding.h" +#include #include #include "../include/raft/uv.h" @@ -108,7 +109,7 @@ static void encodeRequestVoteResult(const struct raft_request_vote_result *p, void *cursor = buf; uint64_t flags = 0; - if (p->pre_vote == raft_tribool_true) { + if (p->pre_vote) { flags |= (1 << 0); } @@ -310,6 +311,7 @@ static void decodeRequestVote(const uv_buf_t *buf, struct raft_request_vote *p) cursor = buf->base; + p->version = 1; p->term = byteGet64(&cursor); p->candidate_id = byteGet64(&cursor); p->last_log_index = byteGet64(&cursor); @@ -320,6 +322,7 @@ static void decodeRequestVote(const uv_buf_t *buf, struct raft_request_vote *p) p->disrupt_leader = false; p->pre_vote = false; } else { + p->version = 2; uint64_t flags = byteGet64(&cursor); p->disrupt_leader = (bool)(flags & 1 << 0); p->pre_vote = (bool)(flags & 1 << 1); @@ -333,15 +336,14 @@ static void decodeRequestVoteResult(const uv_buf_t *buf, cursor = buf->base; + p->version = 1; p->term = byteGet64(&cursor); p->vote_granted = byteGet64(&cursor); - /* Support legacy RequestVoteResultV1 */ - p->pre_vote = raft_tribool_unknown; - if (buf->len > sizeofRequestVoteResultV1()) { + p->version = 2; uint64_t flags = byteGet64(&cursor); - p->pre_vote = TO_RAFT_TRIBOOL(flags & (1 << 0)); + p->pre_vote = (flags & (1 << 0)); } } @@ -408,6 +410,7 @@ static int decodeAppendEntries(const uv_buf_t *buf, cursor = buf->base; + args->version = 0; args->term = byteGet64(&cursor); args->prev_log_index = byteGet64(&cursor); args->prev_log_term = byteGet64(&cursor); @@ -428,6 +431,7 @@ static void decodeAppendEntriesResult(const uv_buf_t *buf, cursor = buf->base; + p->version = 0; p->term = byteGet64(&cursor); p->rejected = byteGet64(&cursor); p->last_log_index = byteGet64(&cursor); @@ -445,6 +449,7 @@ static int decodeInstallSnapshot(const uv_buf_t *buf, cursor = buf->base; + args->version = 0; args->term = byteGet64(&cursor); args->last_index = byteGet64(&cursor); args->last_term = byteGet64(&cursor); @@ -468,12 +473,13 @@ static void decodeTimeoutNow(const uv_buf_t *buf, struct raft_timeout_now *p) cursor = buf->base; + p->version = 0; p->term = byteGet64(&cursor); p->last_log_index = byteGet64(&cursor); p->last_log_term = byteGet64(&cursor); } -int uvDecodeMessage(const unsigned long type, +int uvDecodeMessage(uint16_t type, const uv_buf_t *header, struct raft_message *message, size_t *payload_len) @@ -481,7 +487,7 @@ int uvDecodeMessage(const unsigned long type, unsigned i; int rv = 0; - /* TODO: check type overflow */ + memset(message, 0, sizeof(*message)); message->type = (unsigned short)type; *payload_len = 0; @@ -548,42 +554,3 @@ void uvDecodeEntriesBatch(uint8_t *batch, } } } - -int uvEncodeSnapshotMeta(const struct raft_configuration *conf, - raft_index conf_index, - struct raft_buffer *buf) -{ - size_t conf_len; - void *cursor; - uint64_t *header; - void *conf_buf; - unsigned crc; - - conf_len = configurationEncodedSize(conf); - - buf->len = sizeof(*header) * 4; /* Format, CRC, configuration index/len */ - buf->len += conf_len; - buf->base = raft_malloc(buf->len); - if (buf->base == NULL) { - return RAFT_NOMEM; - } - - header = buf->base; - conf_buf = header + 4; - - configurationEncodeToBuf(conf, conf_buf); - - cursor = header; - bytePut64(&cursor, UV__DISK_FORMAT); - bytePut64(&cursor, 0); - bytePut64(&cursor, conf_index); - bytePut64(&cursor, conf_len); - - crc = byteCrc32(&header[2], sizeof(uint64_t) * 2, 0); /* Conf index/len */ - crc = byteCrc32(conf_buf, conf_len, crc); - - cursor = &header[1]; - bytePut64(&cursor, crc); - - return 0; -} diff --git a/src/uv_encoding.h b/src/uv_encoding.h index f1e5a7076..59515077d 100644 --- a/src/uv_encoding.h +++ b/src/uv_encoding.h @@ -14,7 +14,7 @@ int uvEncodeMessage(const struct raft_message *message, uv_buf_t **bufs, unsigned *n_bufs); -int uvDecodeMessage(unsigned long type, +int uvDecodeMessage(uint16_t type, const uv_buf_t *header, struct raft_message *message, size_t *payload_len); @@ -56,9 +56,5 @@ void uvEncodeBatchHeader(const struct raft_entry *entries, unsigned n, void *buf); -/* Encode the content of a snapshot metadata file. */ -int uvEncodeSnapshotMeta(const struct raft_configuration *conf, - raft_index conf_index, - struct raft_buffer *buf); #endif /* UV_ENCODING_H_ */ diff --git a/src/uv_recv.c b/src/uv_recv.c index c63b99b7c..89304d7b1 100644 --- a/src/uv_recv.c +++ b/src/uv_recv.c @@ -251,9 +251,17 @@ static void uvServerReadCb(uv_stream_t *stream, assert(s->header.base != NULL); type = byteFlip64(s->preamble[0]); - assert(type > 0); - rv = uvDecodeMessage((unsigned long)type, &s->header, &s->message, + /* Only use first 2 bytes of the type. Normally we would check if + * type doesn't overflow UINT16_MAX, but we don't do this to allow + * future legacy nodes to still handle messages that include extra + * information in the 6 unused bytes of the type field of the + * preamble. + * TODO: This is preparation to add the version of the message + * in the raft preamble. Once this change has been active for + * sufficiently long time, we can start encoding the version in some + * of the remaining bytes of s->preamble[0]. */ + rv = uvDecodeMessage((uint16_t)type, &s->header, &s->message, &s->payload.len); if (rv != 0) { Tracef(s->uv->tracer, "decode message: %s", diff --git a/src/uv_tcp.c b/src/uv_tcp.c index 1f042f7ad..f0ab59dbe 100644 --- a/src/uv_tcp.c +++ b/src/uv_tcp.c @@ -61,8 +61,15 @@ int raft_uv_tcp_init(struct raft_uv_transport *transport, { struct UvTcp *t; void *data = transport->data; + int version = transport->version; + if (version != 1) { + ErrMsgPrintf(transport->errmsg, "Invalid version: %d", version); + return RAFT_INVALID; + } + memset(transport, 0, sizeof *transport); transport->data = data; + transport->version = version; t = raft_malloc(sizeof *t); if (t == NULL) { ErrMsgOom(transport->errmsg); diff --git a/test/integration/test_fixture.c b/test/integration/test_fixture.c index a2cd0efac..3b62c0cee 100644 --- a/test/integration/test_fixture.c +++ b/test/integration/test_fixture.c @@ -20,7 +20,7 @@ struct fixture static void *setUp(const MunitParameter params[], MUNIT_UNUSED void *user_data) { - struct fixture *f = munit_malloc(sizeof *f); + struct fixture *f = munit_calloc(1, sizeof *f); struct raft_configuration configuration; unsigned i; int rc; @@ -29,7 +29,7 @@ static void *setUp(const MunitParameter params[], MUNIT_UNUSED void *user_data) FsmInit(&f->fsms[i], 2); } - rc = raft_fixture_initialize(&f->fixture); + rc = raft_fixture_init(&f->fixture); munit_assert_int(rc, ==, 0); for (i = 0; i < N_SERVERS; i++) { @@ -129,23 +129,23 @@ TEST(raft_fixture_step, tick, setUp, tearDown, 0, NULL) ASSERT_TIME(0); event = STEP; - munit_assert_int(event->server_index, ==, 0); - munit_assert_int(event->type, ==, RAFT_FIXTURE_TICK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 0); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_TICK); ASSERT_TIME(100); event = STEP; - munit_assert_int(event->server_index, ==, 1); - munit_assert_int(event->type, ==, RAFT_FIXTURE_TICK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 1); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_TICK); ASSERT_TIME(100); event = STEP; - munit_assert_int(event->server_index, ==, 2); - munit_assert_int(event->type, ==, RAFT_FIXTURE_TICK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 2); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_TICK); ASSERT_TIME(100); event = STEP; - munit_assert_int(event->server_index, ==, 0); - munit_assert_int(event->type, ==, RAFT_FIXTURE_TICK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 0); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_TICK); ASSERT_TIME(200); return MUNIT_OK; @@ -158,8 +158,8 @@ TEST(raft_fixture_step, electionTimeout, setUp, tearDown, 0, NULL) struct raft_fixture_event *event; (void)params; event = STEP_N(28); - munit_assert_int(event->server_index, ==, 0); - munit_assert_int(event->type, ==, RAFT_FIXTURE_TICK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 0); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_TICK); ASSERT_TIME(1000); ASSERT_STATE(0, RAFT_CANDIDATE); ASSERT_STATE(1, RAFT_FOLLOWER); @@ -176,12 +176,12 @@ TEST(raft_fixture_step, flushSend, setUp, tearDown, 0, NULL) (void)params; STEP_UNTIL_STATE_IS(0, RAFT_CANDIDATE); event = STEP; - munit_assert_int(event->server_index, ==, 0); - munit_assert_int(event->type, ==, RAFT_FIXTURE_NETWORK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 0); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_NETWORK); ASSERT_TIME(1000); event = STEP; - munit_assert_int(event->server_index, ==, 0); - munit_assert_int(event->type, ==, RAFT_FIXTURE_NETWORK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 0); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_NETWORK); ASSERT_TIME(1000); return MUNIT_OK; } @@ -197,8 +197,8 @@ TEST(raft_fixture_step, deliver, setUp, tearDown, 0, NULL) STEP_N(2); /* Ticks for server 1 and 2 */ ASSERT_TIME(1000); event = STEP; - munit_assert_int(event->server_index, ==, 0); - munit_assert_int(event->type, ==, RAFT_FIXTURE_NETWORK); + munit_assert_int(raft_fixture_event_server_index(event), ==, 0); + munit_assert_int(raft_fixture_event_type(event), ==, RAFT_FIXTURE_NETWORK); ASSERT_TIME(1015); return MUNIT_OK; } diff --git a/test/integration/test_init.c b/test/integration/test_init.c index 830d2274d..fc4d21c95 100644 --- a/test/integration/test_init.c +++ b/test/integration/test_init.c @@ -50,3 +50,32 @@ TEST(raft_init, incompatIoFsmAsyncSnapshotNull, NULL, NULL, 0, NULL) return MUNIT_OK; } +TEST(raft_init, ioVersionNotSet, NULL, NULL, 0, NULL) +{ + struct raft r = {0}; + struct raft_io io = {0}; + struct raft_fsm fsm = {0}; + io.version = 0; + fsm.version = 3; + + int rc; + rc = raft_init(&r, &io, &fsm, 1, "1"); + munit_assert_int(rc, ==, -1); + munit_assert_string_equal(r.errmsg, "io->version must be set"); + return MUNIT_OK; +} + +TEST(raft_init, fsmVersionNotSet, NULL, NULL, 0, NULL) +{ + struct raft r = {0}; + struct raft_io io = {0}; + struct raft_fsm fsm = {0}; + io.version = 2; + fsm.version = 0; + + int rc; + rc = raft_init(&r, &io, &fsm, 1, "1"); + munit_assert_int(rc, ==, -1); + munit_assert_string_equal(r.errmsg, "fsm->version must be set"); + return MUNIT_OK; +} diff --git a/test/integration/test_membership.c b/test/integration/test_membership.c index 062ff02bb..d93e6c63c 100644 --- a/test/integration/test_membership.c +++ b/test/integration/test_membership.c @@ -1,4 +1,5 @@ #include "../../src/configuration.h" +#include "../../src/progress.h" #include "../lib/cluster.h" #include "../lib/runner.h" diff --git a/test/integration/test_replication.c b/test/integration/test_replication.c index 53c57ca97..07ec43872 100644 --- a/test/integration/test_replication.c +++ b/test/integration/test_replication.c @@ -1,3 +1,4 @@ +#include "../../src/progress.h" #include "../lib/cluster.h" #include "../lib/runner.h" diff --git a/test/integration/test_uv_append.c b/test/integration/test_uv_append.c index 860b01e92..d995457b3 100644 --- a/test/integration/test_uv_append.c +++ b/test/integration/test_uv_append.c @@ -94,6 +94,7 @@ static void tearDown(void *data) \ _rv = uv_loop_init(&_loop); \ munit_assert_int(_rv, ==, 0); \ + _transport.version = 1; \ _rv = raft_uv_tcp_init(&_transport, &_loop); \ munit_assert_int(_rv, ==, 0); \ _rv = raft_uv_init(&_io, &_loop, f->dir, &_transport); \ diff --git a/test/integration/test_uv_load.c b/test/integration/test_uv_load.c index 326b341aa..1f89ccd22 100644 --- a/test/integration/test_uv_load.c +++ b/test/integration/test_uv_load.c @@ -96,6 +96,7 @@ struct snapshot \ /* Initialize the instance, loading existing data, but discarding \ * it. This makes sure that the start index is correctly set. */ \ + _transport.version = 1; \ _rv = raft_uv_tcp_init(&_transport, &f->loop); \ munit_assert_int(_rv, ==, 0); \ _rv = raft_uv_init(&_io, &f->loop, f->dir, &_transport); \ @@ -177,6 +178,7 @@ struct snapshot \ /* Initialize the instance, loading existing data, but discarding \ * it. This makes sure that the start index is correctly set. */ \ + _transport.version = 1; \ _rv = raft_uv_tcp_init(&_transport, &f->loop); \ munit_assert_int(_rv, ==, 0); \ _rv = raft_uv_init(&_io, &f->loop, f->dir, &_transport); \ diff --git a/test/integration/test_uv_recv.c b/test/integration/test_uv_recv.c index 4a8ff2128..b66668a8c 100644 --- a/test/integration/test_uv_recv.c +++ b/test/integration/test_uv_recv.c @@ -138,6 +138,7 @@ static void peerCloseCb(struct raft_io *io) int _rv; \ _rv = uv_loop_init(_loop); \ munit_assert_int(_rv, ==, 0); \ + _transport->version = 1; \ _rv = raft_uv_tcp_init(_transport, _loop); \ munit_assert_int(_rv, ==, 0); \ _rv = raft_uv_init(_io, _loop, f->dir, _transport); \ @@ -310,7 +311,7 @@ TEST(recv, requestVoteResult, setUp, tearDown, 0, NULL) message.type = RAFT_IO_REQUEST_VOTE_RESULT; message.request_vote_result.term = 3; message.request_vote_result.vote_granted = true; - message.request_vote_result.pre_vote = raft_tribool_false; + message.request_vote_result.pre_vote = false; PEER_SEND(&message); RECV(&message); return MUNIT_OK; diff --git a/test/integration/test_uv_tcp_connect.c b/test/integration/test_uv_tcp_connect.c index 1bf632f73..dcbee3d84 100644 --- a/test/integration/test_uv_tcp_connect.c +++ b/test/integration/test_uv_tcp_connect.c @@ -130,6 +130,7 @@ static void *setUpDeps(const MunitParameter params[], SET_UP_HEAP; SETUP_LOOP; SETUP_TCP_SERVER; + f->transport.version = 1; rv = raft_uv_tcp_init(&f->transport, &f->loop); munit_assert_int(rv, ==, 0); return f; diff --git a/test/integration/test_uv_tcp_listen.c b/test/integration/test_uv_tcp_listen.c index 2dbd37882..3d6c03126 100644 --- a/test/integration/test_uv_tcp_listen.c +++ b/test/integration/test_uv_tcp_listen.c @@ -61,6 +61,7 @@ static void acceptCb(struct raft_uv_transport *t, #define INIT \ do { \ int _rv; \ + f->transport.version = 1; \ _rv = raft_uv_tcp_init(&f->transport, &f->loop); \ munit_assert_int(_rv, ==, 0); \ const char *bind_addr = munit_parameters_get(params, "bind-address"); \ diff --git a/test/integration/test_uv_truncate.c b/test/integration/test_uv_truncate.c index e700e5913..0d77c56e8 100644 --- a/test/integration/test_uv_truncate.c +++ b/test/integration/test_uv_truncate.c @@ -137,6 +137,7 @@ static void tearDownDeps(void *data) \ _rv = uv_loop_init(&_loop); \ munit_assert_int(_rv, ==, 0); \ + _transport.version = 1; \ _rv = raft_uv_tcp_init(&_transport, &_loop); \ munit_assert_int(_rv, ==, 0); \ _rv = raft_uv_init(&_io, &_loop, f->dir, &_transport); \ diff --git a/test/lib/cluster.c b/test/lib/cluster.c index 74e067e51..68190389d 100644 --- a/test/lib/cluster.c +++ b/test/lib/cluster.c @@ -39,5 +39,7 @@ void cluster_randomize_init(struct raft_fixture *f) void cluster_randomize(struct raft_fixture *f, struct raft_fixture_event *event) { - randomize(f, event->server_index, event->type); + unsigned index = raft_fixture_event_server_index(event); + int type = raft_fixture_event_type(event); + randomize(f, index, type); } diff --git a/test/lib/cluster.h b/test/lib/cluster.h index be6052d31..de4799f71 100644 --- a/test/lib/cluster.h +++ b/test/lib/cluster.h @@ -49,16 +49,16 @@ atoi(munit_parameters_get(params, CLUSTER_FSM_VERSION_PARAM)); \ } \ munit_assert_int(_n, >, 0); \ - _rv = raft_fixture_initialize(&f->cluster); \ + _rv = raft_fixture_init(&f->cluster); \ munit_assert_int(_rv, ==, 0); \ for (_i = 0; _i < _n; _i++) { \ - _rv = raft_fixture_grow(&f->cluster, &f->fsms[_i]); \ - munit_assert_int(_rv, ==, 0); \ if (!_ss_async || _fsm_version < 3) { \ FsmInit(&f->fsms[_i], _fsm_version); \ } else { \ FsmInitAsync(&f->fsms[_i], _fsm_version); \ } \ + _rv = raft_fixture_grow(&f->cluster, &f->fsms[_i]); \ + munit_assert_int(_rv, ==, 0); \ } \ for (_i = 0; _i < _n; _i++) { \ raft_set_pre_vote(raft_fixture_get(&f->cluster, _i), _pre_vote); \ diff --git a/test/lib/uv.h b/test/lib/uv.h index b08bed871..e7904b76b 100644 --- a/test/lib/uv.h +++ b/test/lib/uv.h @@ -11,11 +11,12 @@ #include "tracer.h" #define FIXTURE_UV_TRANSPORT struct raft_uv_transport transport -#define SETUP_UV_TRANSPORT \ - do { \ - int rv_; \ - rv_ = raft_uv_tcp_init(&f->transport, &f->loop); \ - munit_assert_int(rv_, ==, 0); \ +#define SETUP_UV_TRANSPORT \ + do { \ + int rv_; \ + f->transport.version = 1; \ + rv_ = raft_uv_tcp_init(&f->transport, &f->loop); \ + munit_assert_int(rv_, ==, 0); \ } while (0) #define TEAR_DOWN_UV_TRANSPORT raft_uv_tcp_close(&f->transport) diff --git a/test/unit/test_log.c b/test/unit/test_log.c index be696163a..a4731ed22 100644 --- a/test/unit/test_log.c +++ b/test/unit/test_log.c @@ -12,7 +12,7 @@ struct fixture { FIXTURE_HEAP; - struct raft_log log; + struct raft_log *log; }; /****************************************************************************** @@ -22,11 +22,11 @@ struct fixture *****************************************************************************/ /* Accessors */ -#define NUM_ENTRIES logNumEntries(&f->log) -#define LAST_INDEX logLastIndex(&f->log) -#define TERM_OF(INDEX) logTermOf(&f->log, INDEX) -#define LAST_TERM logLastTerm(&f->log) -#define GET(INDEX) logGet(&f->log, INDEX) +#define NUM_ENTRIES logNumEntries(f->log) +#define LAST_INDEX logLastIndex(f->log) +#define TERM_OF(INDEX) logTermOf(f->log, INDEX) +#define LAST_TERM logLastTerm(f->log) +#define GET(INDEX) logGet(f->log, INDEX) /* Append one command entry with the given term and a hard-coded payload. */ #define APPEND(TERM) \ @@ -36,7 +36,7 @@ struct fixture buf_.base = raft_malloc(8); \ buf_.len = 8; \ strcpy(buf_.base, "hello"); \ - rv_ = logAppend(&f->log, TERM, RAFT_COMMAND, &buf_, NULL); \ + rv_ = logAppend(f->log, TERM, RAFT_COMMAND, &buf_, NULL); \ munit_assert_int(rv_, ==, 0); \ } @@ -56,7 +56,7 @@ struct fixture int rv_; \ buf_.base = raft_malloc(8); \ buf_.len = 8; \ - rv_ = logAppend(&f->log, TERM, RAFT_COMMAND, &buf_, NULL); \ + rv_ = logAppend(f->log, TERM, RAFT_COMMAND, &buf_, NULL); \ munit_assert_int(rv_, ==, RV); \ raft_free(buf_.base); \ } @@ -77,7 +77,7 @@ struct fixture buf.base = (uint8_t *)batch + offset; \ buf.len = 8; \ *(uint64_t *)buf.base = i * 1000; \ - rv = logAppend(&f->log, 1, RAFT_COMMAND, &buf, batch); \ + rv = logAppend(f->log, 1, RAFT_COMMAND, &buf, batch); \ munit_assert_int(rv, ==, 0); \ offset += 8; \ } \ @@ -86,15 +86,15 @@ struct fixture #define ACQUIRE(INDEX) \ { \ int rv2; \ - rv2 = logAcquire(&f->log, INDEX, &entries, &n); \ + rv2 = logAcquire(f->log, INDEX, &entries, &n); \ munit_assert_int(rv2, ==, 0); \ } -#define RELEASE(INDEX) logRelease(&f->log, INDEX, entries, n); +#define RELEASE(INDEX) logRelease(f->log, INDEX, entries, n); -#define TRUNCATE(N) logTruncate(&f->log, N) -#define SNAPSHOT(INDEX, TRAILING) logSnapshot(&f->log, INDEX, TRAILING) -#define RESTORE(INDEX, TERM) logRestore(&f->log, INDEX, TERM) +#define TRUNCATE(N) logTruncate(f->log, N) +#define SNAPSHOT(INDEX, TRAILING) logSnapshot(f->log, INDEX, TRAILING) +#define RESTORE(INDEX, TERM) logRestore(f->log, INDEX, TERM) /****************************************************************************** * @@ -106,14 +106,17 @@ static void *setUp(const MunitParameter params[], MUNIT_UNUSED void *user_data) { struct fixture *f = munit_malloc(sizeof *f); SET_UP_HEAP; - logInit(&f->log); + f->log = logInit(); + if (f->log == NULL) { + munit_assert_true(false); + } return f; } static void tearDown(void *data) { struct fixture *f = data; - logClose(&f->log); + logClose(f->log); TEAR_DOWN_HEAP; free(f); } @@ -127,22 +130,22 @@ static void tearDown(void *data) /* Assert the state of the fixture's log in terms of size, front/back indexes, * offset and number of entries. */ #define ASSERT(SIZE, FRONT, BACK, OFFSET, N) \ - munit_assert_int(f->log.size, ==, SIZE); \ - munit_assert_int(f->log.front, ==, FRONT); \ - munit_assert_int(f->log.back, ==, BACK); \ - munit_assert_int(f->log.offset, ==, OFFSET); \ - munit_assert_int(logNumEntries(&f->log), ==, N) + munit_assert_int(f->log->size, ==, SIZE); \ + munit_assert_int(f->log->front, ==, FRONT); \ + munit_assert_int(f->log->back, ==, BACK); \ + munit_assert_int(f->log->offset, ==, OFFSET); \ + munit_assert_int(logNumEntries(f->log), ==, N) /* Assert the last index and term of the most recent snapshot. */ #define ASSERT_SNAPSHOT(INDEX, TERM) \ - munit_assert_int(f->log.snapshot.last_index, ==, INDEX); \ - munit_assert_int(f->log.snapshot.last_term, ==, TERM) + munit_assert_int(f->log->snapshot.last_index, ==, INDEX); \ + munit_assert_int(f->log->snapshot.last_term, ==, TERM) /* Assert that the term of entry at INDEX equals TERM. */ #define ASSERT_TERM_OF(INDEX, TERM) \ { \ const struct raft_entry *entry; \ - entry = logGet(&f->log, INDEX); \ + entry = logGet(f->log, INDEX); \ munit_assert_ptr_not_null(entry); \ munit_assert_int(entry->term, ==, TERM); \ } @@ -152,14 +155,14 @@ static void tearDown(void *data) #define ASSERT_REFCOUNT(INDEX, COUNT) \ { \ size_t i; \ - munit_assert_ptr_not_null(f->log.refs); \ - for (i = 0; i < f->log.refs_size; i++) { \ - if (f->log.refs[i].index == INDEX) { \ - munit_assert_int(f->log.refs[i].count, ==, COUNT); \ + munit_assert_ptr_not_null(f->log->refs); \ + for (i = 0; i < f->log->refs_size; i++) { \ + if (f->log->refs[i].index == INDEX) { \ + munit_assert_int(f->log->refs[i].count, ==, COUNT); \ break; \ } \ } \ - if (i == f->log.refs_size) { \ + if (i == f->log->refs_size) { \ munit_errorf("no refcount found for entry with index %d", \ (int)INDEX); \ } \ @@ -524,7 +527,7 @@ TEST(logAppend, many, setUp, tearDown, 0, NULL) for (i = 0; i < 3000; i++) { APPEND(1 /* term */); } - munit_assert_int(f->log.refs_size, ==, 4096); + munit_assert_int(f->log->refs_size, ==, 4096); return MUNIT_OK; } @@ -606,7 +609,7 @@ TEST(logAppend, oom, setUp, tearDown, 0, logAppendOom) buf.base = NULL; buf.len = 0; HeapFaultEnable(&f->heap); - rv = logAppend(&f->log, 1, RAFT_COMMAND, &buf, NULL); + rv = logAppend(f->log, 1, RAFT_COMMAND, &buf, NULL); munit_assert_int(rv, ==, RAFT_NOMEM); return MUNIT_OK; } @@ -652,7 +655,7 @@ TEST(logAppendConfiguration, oom, setUp, tearDown, 0, logAppendConfigurationOom) HeapFaultEnable(&f->heap); - rv = logAppendConfiguration(&f->log, 1, &configuration); + rv = logAppendConfiguration(f->log, 1, &configuration); munit_assert_int(rv, ==, RAFT_NOMEM); configurationClose(&configuration); @@ -809,7 +812,7 @@ TEST(logAcquire, oom, setUp, tearDown, 0, NULL) HeapFaultConfig(&f->heap, 0, 1); HeapFaultEnable(&f->heap); - rv = logAcquire(&f->log, 1, &entries, &n); + rv = logAcquire(f->log, 1, &entries, &n); munit_assert_int(rv, ==, RAFT_NOMEM); return MUNIT_OK; @@ -942,7 +945,7 @@ TEST(logTruncate, batch, setUp, tearDown, 0, NULL) struct fixture *f = data; APPEND_BATCH(3 /* n entries */); TRUNCATE(1 /* index */); - munit_assert_int(f->log.size, ==, 0); + munit_assert_int(f->log->size, ==, 0); return MUNIT_OK; } @@ -1029,7 +1032,7 @@ TEST(logTruncate, acquiredOom, setUp, tearDown, 0, logTruncateAcquiredOom) HeapFaultEnable(&f->heap); - rv = logAppend(&f->log, 2, RAFT_COMMAND, &buf, NULL); + rv = logAppend(f->log, 2, RAFT_COMMAND, &buf, NULL); munit_assert_int(rv, ==, RAFT_NOMEM); RELEASE(2);