Skip to content

Commit a903d19

Browse files
fuzzypixelzclalancette
authored andcommitted
Don't copy contiguous bytes on reception (#343)
* Don't copy contiguous bytes on reception This uses the slices iterator API of zenoh-cpp to avoid unecessarily copying bytes into a vecotr, if and only if the bytes is made up of exactly one slice. * Don't use auto type specifiers * Remove unused `<vector>` includes * Explain lifetime of `Contiguous::slice` * Move `Payload` into `zenoh_utils` Signed-off-by: Mahmoud Mazouz <mazouz.mahmoud@outlook.com> Co-authored-by: Chris Lalancette <clalancette@gmail.com>
1 parent 0382ce5 commit a903d19

File tree

4 files changed

+86
-25
lines changed

4 files changed

+86
-25
lines changed

rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include <string>
2525
#include <utility>
2626
#include <variant>
27-
#include <vector>
2827

2928
#include "attachment_helpers.hpp"
3029
#include "cdr.hpp"
@@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp
4443
{
4544
///=============================================================================
4645
SubscriptionData::Message::Message(
47-
std::vector<uint8_t> && p,
46+
const zenoh::Bytes & p,
4847
uint64_t recv_ts,
4948
AttachmentData && attachment_)
50-
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
49+
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
5150
{
5251
}
5352

@@ -225,7 +224,7 @@ bool SubscriptionData::init()
225224

226225
sub_data->add_new_message(
227226
std::make_unique<SubscriptionData::Message>(
228-
sample.get_payload().as_vector(),
227+
sample.get_payload(),
229228
std::chrono::system_clock::now().time_since_epoch().count(),
230229
std::move(attachment_data)),
231230
std::string(sample.get_keyexpr().as_string_view()));
@@ -303,13 +302,12 @@ bool SubscriptionData::init()
303302
"Unable to obtain attachment")
304303
return;
305304
}
306-
auto payload = sample.get_payload().clone();
307305
auto attachment_value = attachment.value();
308306

309307
AttachmentData attachment_data(attachment_value);
310308
sub_data->add_new_message(
311309
std::make_unique<SubscriptionData::Message>(
312-
payload.as_vector(),
310+
sample.get_payload(),
313311
std::chrono::system_clock::now().time_since_epoch().count(),
314312
std::move(attachment_data)),
315313
std::string(sample.get_keyexpr().as_string_view()));

rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <string>
2626
#include <unordered_map>
2727
#include <variant>
28-
#include <vector>
2928

3029
#include <zenoh.hxx>
3130

@@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
5150
struct Message
5251
{
5352
explicit Message(
54-
std::vector<uint8_t> && p,
53+
const zenoh::Bytes & bytes,
5554
uint64_t recv_ts,
5655
AttachmentData && attachment);
5756

5857
~Message();
5958

60-
std::vector<uint8_t> payload;
59+
Payload payload;
6160
uint64_t recv_timestamp;
6261
AttachmentData attachment;
6362
};

rmw_zenoh_cpp/src/detail/zenoh_utils.cpp

+51-12
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,6 @@
2626

2727
namespace rmw_zenoh_cpp
2828
{
29-
///=============================================================================
30-
zenoh::Bytes create_map_and_set_sequence_num(
31-
int64_t sequence_number, std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid)
32-
{
33-
auto now = std::chrono::system_clock::now().time_since_epoch();
34-
auto now_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now);
35-
int64_t source_timestamp = now_ns.count();
36-
37-
rmw_zenoh_cpp::AttachmentData data(sequence_number, source_timestamp, gid);
38-
return data.serialize_to_zbytes();
39-
}
40-
4129
///=============================================================================
4230
ZenohQuery::ZenohQuery(
4331
const zenoh::Query & query,
@@ -82,4 +70,55 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
8270
{
8371
return received_timestamp_;
8472
}
73+
74+
Payload::Payload(const zenoh::Bytes & bytes)
75+
{
76+
// NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of
77+
// buffers contains exactly one element, it is not necessary to concatenate the list of buffers.
78+
// In this case, we store a clone of the bytes object to maintain a non-zero reference-count on
79+
// the buffer. This ensures that the slice into said buffer stays valid until we drop our copy
80+
// of the bytes object (at the very least). This case corresponds to the `Contiguous`
81+
// alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local"
82+
// communication.
83+
84+
zenoh::Bytes::SliceIterator slices = bytes.slice_iter();
85+
std::optional<zenoh::Slice> slice = slices.next();
86+
if (!slice.has_value()) {
87+
bytes_ = nullptr;
88+
} else {
89+
if (!slices.next().has_value()) {
90+
bytes_ = Contiguous {slice.value(), bytes.clone()};
91+
} else {
92+
bytes_ = bytes.as_vector();
93+
}
94+
}
95+
}
96+
97+
const uint8_t * Payload::data() const
98+
{
99+
if (std::holds_alternative<Empty>(bytes_)) {
100+
return nullptr;
101+
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
102+
return std::get<NonContiguous>(bytes_).data();
103+
} else {
104+
return std::get<Contiguous>(bytes_).slice.data;
105+
}
106+
}
107+
108+
size_t Payload::size() const
109+
{
110+
if (std::holds_alternative<Empty>(bytes_)) {
111+
return 0;
112+
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
113+
return std::get<NonContiguous>(bytes_).size();
114+
} else {
115+
return std::get<Contiguous>(bytes_).slice.len;
116+
}
117+
}
118+
119+
bool Payload::empty() const
120+
{
121+
return std::holds_alternative<Empty>(bytes_);
122+
}
123+
85124
} // namespace rmw_zenoh_cpp

rmw_zenoh_cpp/src/detail/zenoh_utils.hpp

+29-4
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,14 @@
2121
#include <chrono>
2222
#include <functional>
2323
#include <optional>
24+
#include <utility>
25+
#include <variant>
26+
#include <vector>
2427

2528
#include "rmw/types.h"
2629

2730
namespace rmw_zenoh_cpp
2831
{
29-
///=============================================================================
30-
zenoh::Bytes create_map_and_set_sequence_num(
31-
int64_t sequence_number, std::array<uint8_t, RMW_GID_STORAGE_SIZE> gid);
32-
3332
///=============================================================================
3433
// A class to store the replies to service requests.
3534
class ZenohReply final
@@ -65,6 +64,32 @@ class ZenohQuery final
6564
zenoh::Query query_;
6665
std::chrono::nanoseconds::rep received_timestamp_;
6766
};
67+
68+
class Payload
69+
{
70+
public:
71+
explicit Payload(const zenoh::Bytes & bytes);
72+
73+
~Payload() = default;
74+
75+
const uint8_t * data() const;
76+
77+
size_t size() const;
78+
79+
bool empty() const;
80+
81+
private:
82+
struct Contiguous
83+
{
84+
zenoh::Slice slice;
85+
zenoh::Bytes bytes;
86+
};
87+
using NonContiguous = std::vector<uint8_t>;
88+
using Empty = std::nullptr_t;
89+
// Is `std::vector<uint8_t>` in case of a non-contiguous payload
90+
// and `zenoh::Slice` plus a `zenoh::Bytes` otherwise.
91+
std::variant<NonContiguous, Contiguous, Empty> bytes_;
92+
};
6893
} // namespace rmw_zenoh_cpp
6994

7095
#endif // DETAIL__ZENOH_UTILS_HPP_

0 commit comments

Comments
 (0)