Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement hard liveliness check #316

Merged
merged 5 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ option(UCLIENT_PROFILE_MATCHING "Enable QoS matching support." OFF)
set(UCLIENT_SHARED_MEMORY_MAX_ENTITIES 4 CACHE STRING "Max number of entities involved in shared memory.")
set(UCLIENT_SHARED_MEMORY_STATIC_MEM_SIZE 10 CACHE STRING "Max number data buffers stored in shared memory")

option(UCLIENT_HARD_LIVELINESS_CHECK "Enable hard liveliness check." OFF)
set(UCLIENT_HARD_LIVELINESS_CHECK_TIMEOUT 10000 CACHE STRING "Set the hard liveliness check interval in milliseconds.")

# Off-standard features and tweaks
option(UCLIENT_TWEAK_XRCE_WRITE_LIMIT "This feature uses a tweak to allow XRCE WRITE DATA submessages grater than 64 kB." ON)

Expand Down
12 changes: 12 additions & 0 deletions include/uxr/client/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,17 @@

#cmakedefine UCLIENT_TWEAK_XRCE_WRITE_LIMIT

#cmakedefine UCLIENT_HARD_LIVELINESS_CHECK

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define UXR_CONFIG_HARD_LIVELINESS_CHECK_TIMEOUT_STR "@UCLIENT_HARD_LIVELINESS_CHECK_TIMEOUT@"
#endif


// Version checks
#if UXR_CLIENT_VERSION_MAJOR >= 3
#error UCLIENT_HARD_LIVELINESS_CHECK shall be included in session API
#error MTU must be included in CREATE_CLIENT_Payload properties
#endif

#endif // _UXR_CLIENT_CONFIG_H_
18 changes: 18 additions & 0 deletions include/uxr/client/core/type/xrce_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern "C"
#endif // ifdef __cplusplus

#include <uxr/client/defines.h>
#include <uxr/client/config.h>

#include <ucdr/microcdr.h>
#include <stdint.h>
Expand All @@ -40,7 +41,24 @@ extern "C"
#define UXR_SAMPLE_DELTA_SEQUENCE_MAX 8
#define UXR_PACKED_SAMPLES_SEQUENCE_MAX 8
#define UXR_TRANSPORT_LOCATOR_SEQUENCE_MAX 4

#ifdef UCLIENT_PROFILE_SHARED_MEMORY
#define PROFILE_SHARED_MEMORY_SEQ_COUNT 1
#else
#define PROFILE_SHARED_MEMORY_SEQ_COUNT 0
#endif // ifdef UCLIENT_PROFILE_SHARED_MEMORY

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define HARD_LIVELINESS_CHECK_SEQ_COUNT 1
#else
#define HARD_LIVELINESS_CHECK_SEQ_COUNT 0
#endif // ifdef UCLIENT_HARD_LIVELINESS_CHECK

#if (PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT) == 0
#define UXR_PROPERTY_SEQUENCE_MAX 1
#else
#define UXR_PROPERTY_SEQUENCE_MAX PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT
#endif // if (PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT) == 0

typedef struct Time_t
{
Expand Down
2 changes: 1 addition & 1 deletion include/uxr/client/util/ping.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extern "C"
#define UXR_PING_BUF 16 // 4 (HEADER SIZE) + 4 (SUBHEADER_SIZE) + 8 (GET_Info payload)

#define GET_INFO_MSG_SIZE 8
#define GET_INFO_REQUEST_ID 9
#define GET_INFO_REQUEST_PING_ID 10

struct uxrSession;

Expand Down
145 changes: 88 additions & 57 deletions src/c/core/session/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@
#include "../../profile/shared_memory/shared_memory_internal.h"

#ifdef UCLIENT_PROFILE_SHARED_MEMORY
#define CREATE_SESSION_PROPERTIES_MAX_SIZE 21
#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE + \
CREATE_SESSION_PROPERTIES_MAX_SIZE)
#define PROFILE_SHARED_MEMORY_ADD_SIZE 21
#else
#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE)
#define PROFILE_SHARED_MEMORY_ADD_SIZE 0
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define HARD_LIVELINESS_CHECK_ADD_SIZE 26
#else
#define HARD_LIVELINESS_CHECK_ADD_SIZE 0
#endif /* ifdef UCLIENT_HARD_LIVELINESS_CHECK */

#define CREATE_SESSION_PROPERTIES_MAX_SIZE PROFILE_SHARED_MEMORY_ADD_SIZE + HARD_LIVELINESS_CHECK_ADD_SIZE


#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE + \
CREATE_SESSION_PROPERTIES_MAX_SIZE)

#define DELETE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + DELETE_CLIENT_PAYLOAD_SIZE)
#define HEARTBEAT_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + HEARTBEAT_PAYLOAD_SIZE)
#define ACKNACK_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + ACKNACK_PAYLOAD_SIZE)
Expand Down Expand Up @@ -102,6 +112,12 @@ static void read_submessage_acknack(
static void read_submessage_timestamp_reply(
uxrSession* session,
ucdrBuffer* submessage);
static void read_submessage_get_info(
uxrSession* session,
ucdrBuffer* submessage);
void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage);
#ifdef PERFORMANCE_TESTING
static void read_submessage_performance(
uxrSession* session,
Expand All @@ -125,9 +141,6 @@ static bool run_session_until_sync(
uxrSession* session,
int timeout);

pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer);

//==================================================================
// PUBLIC
//==================================================================
Expand Down Expand Up @@ -626,52 +639,6 @@ void uxr_flash_output_streams(
//==================================================================
// PRIVATE
//==================================================================
pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer)
{
bool success = false;
bool ret = false;
bool active_session = false;

if (ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE)
{
uint8_t id = 0;
uint8_t flags = 0;
uint16_t length = 0;
uxr_deserialize_submessage_header(buffer, &id, &flags, &length);
success = ucdr_buffer_remaining(buffer) >= length;

if (success && id == SUBMESSAGE_ID_INFO)
{
INFO_Payload info_payload;

success &= uxr_deserialize_BaseObjectReply(buffer, &info_payload.base);
active_session = info_payload.base.result.implementation_status;

success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_config);

if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(buffer, &info_payload.object_info.config);
}

success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(buffer, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(buffer,
&info_payload.object_info.activity._.agent.availability);
ret = success && (info_payload.object_info.activity._.agent.availability > 0);
}
}
}
}

return ret ? (active_session ? PONG_IN_SESSION_STATUS : PONG_NO_SESSION_STATUS) : NO_PONG_STATUS;
}

bool uxr_run_session_until_pong(
uxrSession* session,
int timeout_ms)
Expand Down Expand Up @@ -877,10 +844,6 @@ void read_message(
uxrStreamId id = uxr_stream_id_from_raw(stream_id_raw, UXR_INPUT_STREAM);
read_stream(session, ub, id, seq_num);
}
else
{
session->on_pong_flag = uxr_acknack_pong(ub);
}
}

void read_stream(
Expand Down Expand Up @@ -989,6 +952,14 @@ void read_submessage(
read_submessage_timestamp_reply(session, submessage);
break;

case SUBMESSAGE_ID_GET_INFO:
read_submessage_get_info(session, submessage);
break;

case SUBMESSAGE_ID_INFO:
read_submessage_info(session, submessage);
break;

#ifdef PERFORMANCE_TESTING
case SUBMESSAGE_ID_PERFORMANCE:
read_submessage_performance(session, submessage, length);
Expand Down Expand Up @@ -1095,6 +1066,66 @@ void read_submessage_timestamp_reply(
process_timestamp_reply(session, &timestamp_reply);
}

void read_submessage_get_info(
uxrSession* session,
ucdrBuffer* submessage)
{
GET_INFO_Payload get_info_payload = {
0
};
INFO_Payload info_payload = {
0
};

uxr_deserialize_GET_INFO_Payload(submessage, &get_info_payload);

info_payload.base.related_request.request_id = get_info_payload.base.request_id;

uint8_t buffer[12];
ucdrBuffer ub;
ucdr_init_buffer_origin_offset(&ub, buffer, sizeof(buffer), 0u, uxr_session_header_offset(&session->info));

uxr_serialize_INFO_Payload(&ub, &info_payload);
uxr_stamp_session_header(&session->info, 0, 0, ub.init);

send_message(session, buffer, ucdr_buffer_length(&ub));
}

void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage)
{
INFO_Payload info_payload;

bool success = true;

success &= uxr_deserialize_BaseObjectReply(submessage, &info_payload.base);
bool active_session = info_payload.base.result.implementation_status;

success &= ucdr_deserialize_bool(submessage, &info_payload.object_info.optional_config);

if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(submessage, &info_payload.object_info.config);
}

success &= ucdr_deserialize_bool(submessage, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(submessage, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(submessage,
&info_payload.object_info.activity._.agent.availability);
session->on_pong_flag = (success && (info_payload.object_info.activity._.agent.availability > 0)) ?
(active_session ?
PONG_IN_SESSION_STATUS :
PONG_NO_SESSION_STATUS) :
NO_PONG_STATUS;
}
}
}

#ifdef PERFORMANCE_TESTING
void read_submessage_performance(
uxrSession* session,
Expand Down
33 changes: 29 additions & 4 deletions src/c/core/session/session_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ void uxr_buffer_create_session(
ucdrBuffer* ub,
uint16_t mtu)
{
CREATE_CLIENT_Payload payload;
CREATE_CLIENT_Payload payload = {
0
};
payload.client_representation.xrce_cookie = DDS_XRCE_XRCE_COOKIE;
payload.client_representation.xrce_version = DDS_XRCE_XRCE_VERSION;
payload.client_representation.xrce_vendor_id = VENDOR_ID_EPROSIMA;
Expand All @@ -54,12 +56,35 @@ void uxr_buffer_create_session(
payload.client_representation.client_key.data[3] = info->key[3];
payload.client_representation.session_id = info->id;
payload.client_representation.optional_properties = false;

#ifdef UCLIENT_PROFILE_SHARED_MEMORY
payload.client_representation.optional_properties = true;
payload.client_representation.properties.size = 1;
payload.client_representation.properties.data[0].name = "uxr_sm";
payload.client_representation.properties.data[0].value = "1";
payload.client_representation.properties.data[payload.client_representation.properties.size].name = "uxr_sm";
payload.client_representation.properties.data[payload.client_representation.properties.size].value = "1";
payload.client_representation.properties.size++;
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
payload.client_representation.optional_properties = true;
payload.client_representation.properties.data[payload.client_representation.properties.size].name = "uxr_hl";

const char* str = UXR_CONFIG_HARD_LIVELINESS_CHECK_TIMEOUT_STR;

if (strlen(str) > 6)
{
str = "999999";
}

char buffer[7];
const size_t leading_zeros = 6 - strlen(str);
memset(buffer, '0', leading_zeros);
memcpy(buffer + leading_zeros, str, strlen(str));
buffer[6] = '\0';

payload.client_representation.properties.data[payload.client_representation.properties.size].value = buffer;
payload.client_representation.properties.size++;
#endif /* ifdef UCLIENT_HARD_LIVELINESS_CHECK */

payload.client_representation.mtu = mtu;

info->last_request_id = UXR_REQUEST_LOGIN;
Expand Down
Loading