43
43
// TODO(clalancette): Make this configurable, or get it from the configuration
44
44
#define SHM_BUFFER_SIZE_MB 10
45
45
46
- // This global mapping of raw Data pointers to Data shared pointers allows graph_sub_data_handler()
47
- // to lookup the pointer, and gain a reference to a shared_ptr if it exists.
48
- // This guarantees that the Data object will not be destroyed while we are using it.
49
- static std::mutex data_to_data_shared_ptr_map_mutex;
50
- static std::unordered_map<rmw_context_impl_s::Data *,
51
- std::shared_ptr<rmw_context_impl_s::Data>> data_to_data_shared_ptr_map;
52
-
53
46
// Bundle all class members into a data struct which can be passed as a
54
47
// weak ptr to various threads for thread-safe access without capturing
55
48
// "this" ptr by reference.
56
- class rmw_context_impl_s ::Data final
49
+ class rmw_context_impl_s ::Data final : public std::enable_shared_from_this<Data>
57
50
{
58
51
public:
59
52
// Constructor.
@@ -64,7 +57,8 @@ class rmw_context_impl_s::Data final
64
57
enclave_ (std::move(enclave)),
65
58
is_shutdown_(false ),
66
59
next_entity_id_(0 ),
67
- nodes_({})
60
+ nodes_({}),
61
+ liveliness_keyexpr_(rmw_zenoh_cpp::liveliness::subscription_token(domain_id))
68
62
{
69
63
// Initialize the zenoh configuration.
70
64
std::optional<zenoh::Config> config = rmw_zenoh_cpp::get_z_config (
@@ -127,8 +121,6 @@ class rmw_context_impl_s::Data final
127
121
graph_cache_ =
128
122
std::make_shared<rmw_zenoh_cpp::GraphCache>(this ->session_ ->get_zid ());
129
123
// Setup liveliness subscriptions for discovery.
130
- std::string liveliness_str = rmw_zenoh_cpp::liveliness::subscription_token (domain_id);
131
-
132
124
// Query router/liveliness participants to get graph information before the session was started.
133
125
// We create a blocking channel that is unbounded, ie. `bound` = 0, to receive
134
126
// replies for the z_liveliness_get() call. This is necessary as if the `bound`
@@ -147,14 +139,12 @@ class rmw_context_impl_s::Data final
147
139
148
140
// Intuitively use a FIFO channel rather than building it up with a closure and a
149
141
// handler to FIFO channel
150
- zenoh::KeyExpr keyexpr (liveliness_str);
151
-
152
142
zenoh::Session::GetOptions options = zenoh::Session::GetOptions::create_default ();
153
143
options.target = zenoh::QueryTarget::Z_QUERY_TARGET_ALL;
154
144
options.payload = " " ;
155
145
156
146
auto replies = session_->liveliness_get (
157
- keyexpr ,
147
+ liveliness_keyexpr_ ,
158
148
zenoh::channels::FifoChannel (SIZE_MAX - 1 ),
159
149
zenoh::Session::LivelinessGetOptions::create_default (),
160
150
&result);
@@ -193,29 +183,29 @@ class rmw_context_impl_s::Data final
193
183
graph_guard_condition_ = std::make_unique<rmw_guard_condition_t >();
194
184
graph_guard_condition_->implementation_identifier = rmw_zenoh_cpp::rmw_zenoh_identifier;
195
185
graph_guard_condition_->data = &guard_condition_data_;
186
+ }
196
187
188
+ void init ()
189
+ {
197
190
// Setup the liveliness subscriber to receives updates from the ROS graph
198
191
// and update the graph cache.
199
- zenoh::KeyExpr keyexpr_cpp (liveliness_str.c_str ());
200
192
zenoh::Session::LivelinessSubscriberOptions sub_options =
201
193
zenoh::Session::LivelinessSubscriberOptions::create_default ();
202
194
sub_options.history = true ;
203
- graph_subscriber_ = session_->liveliness_declare_subscriber (
204
- keyexpr_cpp,
205
- [&](const zenoh::Sample & sample) {
206
- // Look up the data shared_ptr in the global map. If it is in there, use it.
207
- // If not, it is being shutdown so we can just ignore this update.
208
- std::shared_ptr<rmw_context_impl_s::Data> data_shared_ptr{nullptr };
209
- {
210
- std::lock_guard<std::mutex> lk (data_to_data_shared_ptr_map_mutex);
211
- if (data_to_data_shared_ptr_map.count (this ) == 0 ) {
212
- return ;
213
- }
214
- data_shared_ptr = data_to_data_shared_ptr_map[this ];
215
- }
216
195
196
+ // This can't be in the constructor since shared_from_this() doesn't work there.
197
+ std::weak_ptr<Data> context_impl_data_wp = shared_from_this ();
198
+ zenoh::ZResult result;
199
+ graph_subscriber_ = session_->liveliness_declare_subscriber (
200
+ liveliness_keyexpr_,
201
+ [context_impl_data_wp](const zenoh::Sample & sample) {
217
202
// Update the graph cache.
218
- data_shared_ptr->update_graph_cache (
203
+ std::shared_ptr<Data> context_impl_data = context_impl_data_wp.lock ();
204
+ if (context_impl_data == nullptr ) {
205
+ RMW_ZENOH_LOG_ERROR_NAMED (" rmw_zenoh_cpp" , " Unable to obtain context_impl." );
206
+ return ;
207
+ }
208
+ context_impl_data->update_graph_cache (
219
209
sample,
220
210
std::string (sample.get_keyexpr ().as_string_view ()));
221
211
},
@@ -435,6 +425,8 @@ class rmw_context_impl_s::Data final
435
425
std::size_t next_entity_id_;
436
426
// Nodes created from this context.
437
427
std::unordered_map<const rmw_node_t *, std::shared_ptr<rmw_zenoh_cpp::NodeData>> nodes_;
428
+
429
+ zenoh::KeyExpr liveliness_keyexpr_;
438
430
};
439
431
440
432
// /=============================================================================
@@ -443,9 +435,7 @@ rmw_context_impl_s::rmw_context_impl_s(
443
435
const std::string & enclave)
444
436
{
445
437
data_ = std::make_shared<Data>(domain_id, std::move (enclave));
446
-
447
- std::lock_guard<std::mutex> lk (data_to_data_shared_ptr_map_mutex);
448
- data_to_data_shared_ptr_map.emplace (data_.get (), data_);
438
+ data_->init ();
449
439
}
450
440
451
441
// /=============================================================================
@@ -487,11 +477,6 @@ std::size_t rmw_context_impl_s::get_next_entity_id()
487
477
// /=============================================================================
488
478
rmw_ret_t rmw_context_impl_s::shutdown ()
489
479
{
490
- {
491
- std::lock_guard<std::mutex> lk (data_to_data_shared_ptr_map_mutex);
492
- data_to_data_shared_ptr_map.erase (data_.get ());
493
- }
494
-
495
480
return data_->shutdown ();
496
481
}
497
482
0 commit comments