Skip to content

Commit

Permalink
Implement hard liveliness check (#316)
Browse files Browse the repository at this point in the history
* Hard liveliness check

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

Update

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Revert "Hard liveliness check"

This reverts commit 7d94502.

* Update

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Uncrustify

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>

* Review comments

Signed-off-by: Pablo Garrido <pablogs9@gmail.com>
  • Loading branch information
pablogs9 authored Mar 25, 2022
1 parent b5187a9 commit 12bed76
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 68 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ option(UCLIENT_PROFILE_MATCHING "Enable QoS matching support." OFF)
set(UCLIENT_SHARED_MEMORY_MAX_ENTITIES 4 CACHE STRING "Max number of entities involved in shared memory.")
set(UCLIENT_SHARED_MEMORY_STATIC_MEM_SIZE 10 CACHE STRING "Max number data buffers stored in shared memory")

option(UCLIENT_HARD_LIVELINESS_CHECK "Enable hard liveliness check." OFF)
set(UCLIENT_HARD_LIVELINESS_CHECK_TIMEOUT 10000 CACHE STRING "Set the hard liveliness check interval in milliseconds.")

# Off-standard features and tweaks
option(UCLIENT_TWEAK_XRCE_WRITE_LIMIT "This feature uses a tweak to allow XRCE WRITE DATA submessages grater than 64 kB." ON)

Expand Down
12 changes: 12 additions & 0 deletions include/uxr/client/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,17 @@

#cmakedefine UCLIENT_TWEAK_XRCE_WRITE_LIMIT

#cmakedefine UCLIENT_HARD_LIVELINESS_CHECK

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define UXR_CONFIG_HARD_LIVELINESS_CHECK_TIMEOUT_STR "@UCLIENT_HARD_LIVELINESS_CHECK_TIMEOUT@"
#endif


// Version checks
#if UXR_CLIENT_VERSION_MAJOR >= 3
#error UCLIENT_HARD_LIVELINESS_CHECK shall be included in session API
#error MTU must be included in CREATE_CLIENT_Payload properties
#endif

#endif // _UXR_CLIENT_CONFIG_H_
18 changes: 18 additions & 0 deletions include/uxr/client/core/type/xrce_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern "C"
#endif // ifdef __cplusplus

#include <uxr/client/defines.h>
#include <uxr/client/config.h>

#include <ucdr/microcdr.h>
#include <stdint.h>
Expand All @@ -40,7 +41,24 @@ extern "C"
#define UXR_SAMPLE_DELTA_SEQUENCE_MAX 8
#define UXR_PACKED_SAMPLES_SEQUENCE_MAX 8
#define UXR_TRANSPORT_LOCATOR_SEQUENCE_MAX 4

#ifdef UCLIENT_PROFILE_SHARED_MEMORY
#define PROFILE_SHARED_MEMORY_SEQ_COUNT 1
#else
#define PROFILE_SHARED_MEMORY_SEQ_COUNT 0
#endif // ifdef UCLIENT_PROFILE_SHARED_MEMORY

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define HARD_LIVELINESS_CHECK_SEQ_COUNT 1
#else
#define HARD_LIVELINESS_CHECK_SEQ_COUNT 0
#endif // ifdef UCLIENT_HARD_LIVELINESS_CHECK

#if (PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT) == 0
#define UXR_PROPERTY_SEQUENCE_MAX 1
#else
#define UXR_PROPERTY_SEQUENCE_MAX PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT
#endif // if (PROFILE_SHARED_MEMORY_SEQ_COUNT + HARD_LIVELINESS_CHECK_SEQ_COUNT) == 0

typedef struct Time_t
{
Expand Down
2 changes: 1 addition & 1 deletion include/uxr/client/util/ping.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ extern "C"
#define UXR_PING_BUF 16 // 4 (HEADER SIZE) + 4 (SUBHEADER_SIZE) + 8 (GET_Info payload)

#define GET_INFO_MSG_SIZE 8
#define GET_INFO_REQUEST_ID 9
#define GET_INFO_REQUEST_PING_ID 10

struct uxrSession;

Expand Down
145 changes: 88 additions & 57 deletions src/c/core/session/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@
#include "../../profile/shared_memory/shared_memory_internal.h"

#ifdef UCLIENT_PROFILE_SHARED_MEMORY
#define CREATE_SESSION_PROPERTIES_MAX_SIZE 21
#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE + \
CREATE_SESSION_PROPERTIES_MAX_SIZE)
#define PROFILE_SHARED_MEMORY_ADD_SIZE 21
#else
#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE)
#define PROFILE_SHARED_MEMORY_ADD_SIZE 0
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
#define HARD_LIVELINESS_CHECK_ADD_SIZE 26
#else
#define HARD_LIVELINESS_CHECK_ADD_SIZE 0
#endif /* ifdef UCLIENT_HARD_LIVELINESS_CHECK */

#define CREATE_SESSION_PROPERTIES_MAX_SIZE PROFILE_SHARED_MEMORY_ADD_SIZE + HARD_LIVELINESS_CHECK_ADD_SIZE


#define CREATE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + CREATE_CLIENT_PAYLOAD_SIZE + \
CREATE_SESSION_PROPERTIES_MAX_SIZE)

#define DELETE_SESSION_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + DELETE_CLIENT_PAYLOAD_SIZE)
#define HEARTBEAT_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + HEARTBEAT_PAYLOAD_SIZE)
#define ACKNACK_MAX_MSG_SIZE (MAX_HEADER_SIZE + SUBHEADER_SIZE + ACKNACK_PAYLOAD_SIZE)
Expand Down Expand Up @@ -102,6 +112,12 @@ static void read_submessage_acknack(
static void read_submessage_timestamp_reply(
uxrSession* session,
ucdrBuffer* submessage);
static void read_submessage_get_info(
uxrSession* session,
ucdrBuffer* submessage);
void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage);
#ifdef PERFORMANCE_TESTING
static void read_submessage_performance(
uxrSession* session,
Expand All @@ -125,9 +141,6 @@ static bool run_session_until_sync(
uxrSession* session,
int timeout);

pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer);

//==================================================================
// PUBLIC
//==================================================================
Expand Down Expand Up @@ -626,52 +639,6 @@ void uxr_flash_output_streams(
//==================================================================
// PRIVATE
//==================================================================
pong_status_t uxr_acknack_pong(
ucdrBuffer* buffer)
{
bool success = false;
bool ret = false;
bool active_session = false;

if (ucdr_buffer_remaining(buffer) > SUBHEADER_SIZE)
{
uint8_t id = 0;
uint8_t flags = 0;
uint16_t length = 0;
uxr_deserialize_submessage_header(buffer, &id, &flags, &length);
success = ucdr_buffer_remaining(buffer) >= length;

if (success && id == SUBMESSAGE_ID_INFO)
{
INFO_Payload info_payload;

success &= uxr_deserialize_BaseObjectReply(buffer, &info_payload.base);
active_session = info_payload.base.result.implementation_status;

success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_config);

if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(buffer, &info_payload.object_info.config);
}

success &= ucdr_deserialize_bool(buffer, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(buffer, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(buffer,
&info_payload.object_info.activity._.agent.availability);
ret = success && (info_payload.object_info.activity._.agent.availability > 0);
}
}
}
}

return ret ? (active_session ? PONG_IN_SESSION_STATUS : PONG_NO_SESSION_STATUS) : NO_PONG_STATUS;
}

bool uxr_run_session_until_pong(
uxrSession* session,
int timeout_ms)
Expand Down Expand Up @@ -877,10 +844,6 @@ void read_message(
uxrStreamId id = uxr_stream_id_from_raw(stream_id_raw, UXR_INPUT_STREAM);
read_stream(session, ub, id, seq_num);
}
else
{
session->on_pong_flag = uxr_acknack_pong(ub);
}
}

void read_stream(
Expand Down Expand Up @@ -989,6 +952,14 @@ void read_submessage(
read_submessage_timestamp_reply(session, submessage);
break;

case SUBMESSAGE_ID_GET_INFO:
read_submessage_get_info(session, submessage);
break;

case SUBMESSAGE_ID_INFO:
read_submessage_info(session, submessage);
break;

#ifdef PERFORMANCE_TESTING
case SUBMESSAGE_ID_PERFORMANCE:
read_submessage_performance(session, submessage, length);
Expand Down Expand Up @@ -1095,6 +1066,66 @@ void read_submessage_timestamp_reply(
process_timestamp_reply(session, &timestamp_reply);
}

void read_submessage_get_info(
uxrSession* session,
ucdrBuffer* submessage)
{
GET_INFO_Payload get_info_payload = {
0
};
INFO_Payload info_payload = {
0
};

uxr_deserialize_GET_INFO_Payload(submessage, &get_info_payload);

info_payload.base.related_request.request_id = get_info_payload.base.request_id;

uint8_t buffer[12];
ucdrBuffer ub;
ucdr_init_buffer_origin_offset(&ub, buffer, sizeof(buffer), 0u, uxr_session_header_offset(&session->info));

uxr_serialize_INFO_Payload(&ub, &info_payload);
uxr_stamp_session_header(&session->info, 0, 0, ub.init);

send_message(session, buffer, ucdr_buffer_length(&ub));
}

void read_submessage_info(
uxrSession* session,
ucdrBuffer* submessage)
{
INFO_Payload info_payload;

bool success = true;

success &= uxr_deserialize_BaseObjectReply(submessage, &info_payload.base);
bool active_session = info_payload.base.result.implementation_status;

success &= ucdr_deserialize_bool(submessage, &info_payload.object_info.optional_config);

if (info_payload.object_info.optional_config)
{
success &= uxr_deserialize_ObjectVariant(submessage, &info_payload.object_info.config);
}

success &= ucdr_deserialize_bool(submessage, &info_payload.object_info.optional_activity);
if (info_payload.object_info.optional_activity)
{
success &= ucdr_deserialize_uint8_t(submessage, &info_payload.object_info.activity.kind);
if (success && DDS_XRCE_OBJK_AGENT == info_payload.object_info.activity.kind)
{
success &= ucdr_deserialize_int16_t(submessage,
&info_payload.object_info.activity._.agent.availability);
session->on_pong_flag = (success && (info_payload.object_info.activity._.agent.availability > 0)) ?
(active_session ?
PONG_IN_SESSION_STATUS :
PONG_NO_SESSION_STATUS) :
NO_PONG_STATUS;
}
}
}

#ifdef PERFORMANCE_TESTING
void read_submessage_performance(
uxrSession* session,
Expand Down
33 changes: 29 additions & 4 deletions src/c/core/session/session_info.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ void uxr_buffer_create_session(
ucdrBuffer* ub,
uint16_t mtu)
{
CREATE_CLIENT_Payload payload;
CREATE_CLIENT_Payload payload = {
0
};
payload.client_representation.xrce_cookie = DDS_XRCE_XRCE_COOKIE;
payload.client_representation.xrce_version = DDS_XRCE_XRCE_VERSION;
payload.client_representation.xrce_vendor_id = VENDOR_ID_EPROSIMA;
Expand All @@ -54,12 +56,35 @@ void uxr_buffer_create_session(
payload.client_representation.client_key.data[3] = info->key[3];
payload.client_representation.session_id = info->id;
payload.client_representation.optional_properties = false;

#ifdef UCLIENT_PROFILE_SHARED_MEMORY
payload.client_representation.optional_properties = true;
payload.client_representation.properties.size = 1;
payload.client_representation.properties.data[0].name = "uxr_sm";
payload.client_representation.properties.data[0].value = "1";
payload.client_representation.properties.data[payload.client_representation.properties.size].name = "uxr_sm";
payload.client_representation.properties.data[payload.client_representation.properties.size].value = "1";
payload.client_representation.properties.size++;
#endif /* ifdef UCLIENT_PROFILE_SHARED_MEMORY */

#ifdef UCLIENT_HARD_LIVELINESS_CHECK
payload.client_representation.optional_properties = true;
payload.client_representation.properties.data[payload.client_representation.properties.size].name = "uxr_hl";

const char* str = UXR_CONFIG_HARD_LIVELINESS_CHECK_TIMEOUT_STR;

if (strlen(str) > 6)
{
str = "999999";
}

char buffer[7];
const size_t leading_zeros = 6 - strlen(str);
memset(buffer, '0', leading_zeros);
memcpy(buffer + leading_zeros, str, strlen(str));
buffer[6] = '\0';

payload.client_representation.properties.data[payload.client_representation.properties.size].value = buffer;
payload.client_representation.properties.size++;
#endif /* ifdef UCLIENT_HARD_LIVELINESS_CHECK */

payload.client_representation.mtu = mtu;

info->last_request_id = UXR_REQUEST_LOGIN;
Expand Down
Loading

0 comments on commit 12bed76

Please sign in to comment.