Skip to content

Commit bb6fd88

Browse files
committed
Recycle serialization buffers on transmission
Adds a LIFO buffer pool in the context to reuse buffers allocated on serialization. The aim is not (only) to avoid the overhead of dynamic allocation but rather to enhance the cache locality of serialization buffers.
1 parent 68b4d9a commit bb6fd88

File tree

3 files changed

+92
-14
lines changed

3 files changed

+92
-14
lines changed
+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2024 Open Source Robotics Foundation, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef DETAIL__BUFFER_POOL_HPP_
16+
#define DETAIL__BUFFER_POOL_HPP_
17+
18+
#include <cstddef>
19+
#include <mutex>
20+
#include <vector>
21+
22+
#include "rcutils/allocator.h"
23+
24+
// FIXME(fuzzypixelz): indeed, we leak all allocated buffers ;)
25+
class BufferPool
26+
{
27+
public:
28+
struct Buffer
29+
{
30+
uint8_t *data;
31+
size_t size;
32+
};
33+
34+
BufferPool() = default;
35+
36+
Buffer allocate(rcutils_allocator_t *allocator, size_t size)
37+
{
38+
std::lock_guard<std::mutex> guard(mutex_);
39+
40+
if (buffers_.empty()) {
41+
uint8_t *data = static_cast<uint8_t *>(allocator->allocate(size, allocator->state));
42+
if (data == nullptr) {
43+
return {};
44+
}
45+
return Buffer {data, size};
46+
} else {
47+
Buffer buffer = buffers_.back();
48+
buffers_.pop_back();
49+
if (buffer.size < size) {
50+
uint8_t *data = static_cast<uint8_t *>(allocator->reallocate(
51+
buffer.data, size, allocator->state));
52+
if (data == nullptr) {
53+
return {};
54+
}
55+
buffer.data = data;
56+
buffer.size = size;
57+
}
58+
return buffer;
59+
}
60+
}
61+
62+
void
63+
deallocate(Buffer buffer)
64+
{
65+
std::lock_guard<std::mutex> guard(mutex_);
66+
buffers_.push_back(buffer);
67+
}
68+
69+
private:
70+
std::vector<Buffer> buffers_;
71+
std::mutex mutex_;
72+
};
73+
74+
#endif // DETAIL__BUFFER_POOL_HPP_

rmw_zenoh_cpp/src/detail/rmw_context_impl_s.hpp

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "graph_cache.hpp"
2626
#include "rmw_node_data.hpp"
27+
#include "buffer_pool.hpp"
2728

2829
#include "rmw/ret_types.h"
2930
#include "rmw/types.h"
@@ -92,6 +93,9 @@ struct rmw_context_impl_s final
9293
// Forward declaration
9394
class Data;
9495

96+
// Pool of serialization buffers.
97+
BufferPool serialization_buffer_pool;
98+
9599
private:
96100
std::shared_ptr<Data> data_{nullptr};
97101
};

rmw_zenoh_cpp/src/detail/rmw_publisher_data.cpp

+14-14
Original file line numberDiff line numberDiff line change
@@ -210,24 +210,21 @@ rmw_ret_t PublisherData::publish(
210210
type_support_impl_);
211211

212212
// To store serialized message byte array.
213-
char * msg_bytes = nullptr;
213+
uint8_t * msg_bytes = nullptr;
214214

215215
rcutils_allocator_t * allocator = &rmw_node_->context->options.allocator;
216216

217-
auto always_free_msg_bytes = rcpputils::make_scope_exit(
218-
[&msg_bytes, allocator]() {
219-
if (msg_bytes) {
220-
allocator->deallocate(msg_bytes, allocator->state);
221-
}
222-
});
217+
rmw_context_impl_s *context_impl = static_cast<rmw_context_impl_s *>(rmw_node_->data);
223218

224-
// Get memory from the allocator.
225-
msg_bytes = static_cast<char *>(allocator->allocate(max_data_length, allocator->state));
219+
// Get memory from the serialization buffer pool.
220+
BufferPool::Buffer serialization_buffer =
221+
context_impl->serialization_buffer_pool.allocate(allocator, max_data_length);
222+
msg_bytes = serialization_buffer.data;
226223
RMW_CHECK_FOR_NULL_WITH_MSG(
227224
msg_bytes, "bytes for message is null", return RMW_RET_BAD_ALLOC);
228225

229226
// Object that manages the raw buffer
230-
eprosima::fastcdr::FastBuffer fastbuffer(msg_bytes, max_data_length);
227+
eprosima::fastcdr::FastBuffer fastbuffer(reinterpret_cast<char *>(msg_bytes), max_data_length);
231228

232229
// Object that serializes the data
233230
rmw_zenoh_cpp::Cdr ser(fastbuffer);
@@ -251,11 +248,14 @@ rmw_ret_t PublisherData::publish(
251248
sequence_number_++,
252249
entity_->copy_gid());
253250

251+
auto delete_bytes = [buffer_pool = &context_impl->serialization_buffer_pool,
252+
buffer = serialization_buffer](uint8_t * data){
253+
assert(buffer.data == data);
254+
buffer_pool->deallocate(buffer);
255+
};
256+
254257
// TODO(ahcorde): shmbuf
255-
std::vector<uint8_t> raw_data(
256-
reinterpret_cast<const uint8_t *>(msg_bytes),
257-
reinterpret_cast<const uint8_t *>(msg_bytes) + data_length);
258-
zenoh::Bytes payload(std::move(raw_data));
258+
zenoh::Bytes payload(msg_bytes, data_length, delete_bytes);
259259

260260
pub_.put(std::move(payload), std::move(options), &result);
261261
if (result != Z_OK) {

0 commit comments

Comments
 (0)