Skip to content

Commit aaf2678

Browse files
fuzzypixelzYadunund
authored andcommitted
Don't copy contiguous bytes on reception (#343)
* Don't copy contiguous bytes on reception This uses the slices iterator API of zenoh-cpp to avoid unecessarily copying bytes into a vecotr, if and only if the bytes is made up of exactly one slice. * Don't use auto type specifiers * Remove unused `<vector>` includes * Explain lifetime of `Contiguous::slice` * Move `Payload` into `zenoh_utils` Signed-off-by: Mahmoud Mazouz <mazouz.mahmoud@outlook.com> Co-authored-by: Chris Lalancette <clalancette@gmail.com> (cherry picked from commit cebb972) # Conflicts: # rmw_zenoh_cpp/src/detail/zenoh_utils.cpp # rmw_zenoh_cpp/src/detail/zenoh_utils.hpp
1 parent 0382ce5 commit aaf2678

File tree

4 files changed

+101
-9
lines changed

4 files changed

+101
-9
lines changed

rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp

+4-6
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
#include <string>
2525
#include <utility>
2626
#include <variant>
27-
#include <vector>
2827

2928
#include "attachment_helpers.hpp"
3029
#include "cdr.hpp"
@@ -44,10 +43,10 @@ namespace rmw_zenoh_cpp
4443
{
4544
///=============================================================================
4645
SubscriptionData::Message::Message(
47-
std::vector<uint8_t> && p,
46+
const zenoh::Bytes & p,
4847
uint64_t recv_ts,
4948
AttachmentData && attachment_)
50-
: payload(std::move(p)), recv_timestamp(recv_ts), attachment(std::move(attachment_))
49+
: payload(p), recv_timestamp(recv_ts), attachment(std::move(attachment_))
5150
{
5251
}
5352

@@ -225,7 +224,7 @@ bool SubscriptionData::init()
225224

226225
sub_data->add_new_message(
227226
std::make_unique<SubscriptionData::Message>(
228-
sample.get_payload().as_vector(),
227+
sample.get_payload(),
229228
std::chrono::system_clock::now().time_since_epoch().count(),
230229
std::move(attachment_data)),
231230
std::string(sample.get_keyexpr().as_string_view()));
@@ -303,13 +302,12 @@ bool SubscriptionData::init()
303302
"Unable to obtain attachment")
304303
return;
305304
}
306-
auto payload = sample.get_payload().clone();
307305
auto attachment_value = attachment.value();
308306

309307
AttachmentData attachment_data(attachment_value);
310308
sub_data->add_new_message(
311309
std::make_unique<SubscriptionData::Message>(
312-
payload.as_vector(),
310+
sample.get_payload(),
313311
std::chrono::system_clock::now().time_since_epoch().count(),
314312
std::move(attachment_data)),
315313
std::string(sample.get_keyexpr().as_string_view()));

rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp

+2-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
#include <string>
2626
#include <unordered_map>
2727
#include <variant>
28-
#include <vector>
2928

3029
#include <zenoh.hxx>
3130

@@ -51,13 +50,13 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
5150
struct Message
5251
{
5352
explicit Message(
54-
std::vector<uint8_t> && p,
53+
const zenoh::Bytes & bytes,
5554
uint64_t recv_ts,
5655
AttachmentData && attachment);
5756

5857
~Message();
5958

60-
std::vector<uint8_t> payload;
59+
Payload payload;
6160
uint64_t recv_timestamp;
6261
AttachmentData attachment;
6362
};

rmw_zenoh_cpp/src/detail/zenoh_utils.cpp

+61
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,65 @@ std::chrono::nanoseconds::rep ZenohReply::get_received_timestamp() const
8282
{
8383
return received_timestamp_;
8484
}
85+
<<<<<<< HEAD
86+
=======
87+
88+
int64_t get_system_time_in_ns()
89+
{
90+
auto now = std::chrono::system_clock::now().time_since_epoch();
91+
return std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
92+
}
93+
94+
///=============================================================================
95+
Payload::Payload(const zenoh::Bytes & bytes)
96+
{
97+
// NOTE(fuzzypixelz): `zenoh::Bytes` is an list of reference-couted buffers. When the list of
98+
// buffers contains exactly one element, it is not necessary to concatenate the list of buffers.
99+
// In this case, we store a clone of the bytes object to maintain a non-zero reference-count on
100+
// the buffer. This ensures that the slice into said buffer stays valid until we drop our copy
101+
// of the bytes object (at the very least). This case corresponds to the `Contiguous`
102+
// alternative of the `bytes_` variant and aims to optimize away a memcpy during "session-local"
103+
// communication.
104+
105+
zenoh::Bytes::SliceIterator slices = bytes.slice_iter();
106+
std::optional<zenoh::Slice> slice = slices.next();
107+
if (!slice.has_value()) {
108+
bytes_ = nullptr;
109+
} else {
110+
if (!slices.next().has_value()) {
111+
bytes_ = Contiguous {slice.value(), bytes.clone()};
112+
} else {
113+
bytes_ = bytes.as_vector();
114+
}
115+
}
116+
}
117+
118+
const uint8_t * Payload::data() const
119+
{
120+
if (std::holds_alternative<Empty>(bytes_)) {
121+
return nullptr;
122+
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
123+
return std::get<NonContiguous>(bytes_).data();
124+
} else {
125+
return std::get<Contiguous>(bytes_).slice.data;
126+
}
127+
}
128+
129+
size_t Payload::size() const
130+
{
131+
if (std::holds_alternative<Empty>(bytes_)) {
132+
return 0;
133+
} else if (std::holds_alternative<NonContiguous>(bytes_)) {
134+
return std::get<NonContiguous>(bytes_).size();
135+
} else {
136+
return std::get<Contiguous>(bytes_).slice.len;
137+
}
138+
}
139+
140+
bool Payload::empty() const
141+
{
142+
return std::holds_alternative<Empty>(bytes_);
143+
}
144+
145+
>>>>>>> cebb972 (Don't copy contiguous bytes on reception (#343))
85146
} // namespace rmw_zenoh_cpp

rmw_zenoh_cpp/src/detail/zenoh_utils.hpp

+34
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
#include <chrono>
2222
#include <functional>
2323
#include <optional>
24+
#include <utility>
25+
#include <variant>
26+
#include <vector>
2427

2528
#include "rmw/types.h"
2629

@@ -65,6 +68,37 @@ class ZenohQuery final
6568
zenoh::Query query_;
6669
std::chrono::nanoseconds::rep received_timestamp_;
6770
};
71+
<<<<<<< HEAD
72+
=======
73+
74+
int64_t get_system_time_in_ns();
75+
76+
class Payload
77+
{
78+
public:
79+
explicit Payload(const zenoh::Bytes & bytes);
80+
81+
~Payload() = default;
82+
83+
const uint8_t * data() const;
84+
85+
size_t size() const;
86+
87+
bool empty() const;
88+
89+
private:
90+
struct Contiguous
91+
{
92+
zenoh::Slice slice;
93+
zenoh::Bytes bytes;
94+
};
95+
using NonContiguous = std::vector<uint8_t>;
96+
using Empty = std::nullptr_t;
97+
// Is `std::vector<uint8_t>` in case of a non-contiguous payload
98+
// and `zenoh::Slice` plus a `zenoh::Bytes` otherwise.
99+
std::variant<NonContiguous, Contiguous, Empty> bytes_;
100+
};
101+
>>>>>>> cebb972 (Don't copy contiguous bytes on reception (#343))
68102
} // namespace rmw_zenoh_cpp
69103
70104
#endif // DETAIL__ZENOH_UTILS_HPP_

0 commit comments

Comments
 (0)