Skip to content

Commit d37fc80

Browse files
authored
refactor: compute routes on demand (#1735)
* refactor: compute routes on demand * fix: disable routes when they were updated * refactor: improve code
1 parent 0054617 commit d37fc80

File tree

19 files changed

+202
-731
lines changed

19 files changed

+202
-731
lines changed

zenoh/src/net/routing/dispatcher/face.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -390,12 +390,8 @@ impl Primitives for Face {
390390
&mut |p, m| declares.push((p.clone(), m)),
391391
);
392392

393-
// recompute routes
394-
// TODO: disable routes and recompute them in parallel to avoid holding
395-
// tables write lock for a long time.
396-
let mut root_res = wtables.root_res.clone();
397-
update_data_routes_from(&mut wtables, &mut root_res);
398-
update_query_routes_from(&mut wtables, &mut root_res);
393+
disable_all_data_routes(&mut wtables);
394+
disable_all_query_routes(&mut wtables);
399395

400396
drop(wtables);
401397
drop(ctrl_lock);

zenoh/src/net/routing/dispatcher/pubsub.rs

+24-94
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,15 @@ use zenoh_sync::get_mut_unchecked;
2626

2727
use super::{
2828
face::FaceState,
29-
resource::{DataRoutes, Direction, Resource},
29+
resource::{Direction, Resource},
3030
tables::{NodeId, Route, RoutingExpr, Tables, TablesLock},
3131
};
3232
#[zenoh_macros::unstable]
3333
use crate::key_expr::KeyExpr;
34-
use crate::net::routing::hat::{HatTrait, SendDeclare};
34+
use crate::net::routing::{
35+
hat::{HatTrait, SendDeclare},
36+
router::get_or_set_route,
37+
};
3538

3639
#[derive(Copy, Clone)]
3740
pub(crate) struct SubscriberInfo;
@@ -93,18 +96,6 @@ pub(crate) fn declare_subscription(
9396

9497
disable_matches_data_routes(&mut wtables, &mut res);
9598
drop(wtables);
96-
97-
let rtables = zread!(tables.tables);
98-
let matches_data_routes = compute_matches_data_routes(&rtables, &res);
99-
drop(rtables);
100-
101-
let wtables = zwrite!(tables.tables);
102-
for (mut res, data_routes) in matches_data_routes {
103-
get_mut_unchecked(&mut res)
104-
.context_mut()
105-
.update_data_routes(data_routes);
106-
}
107-
drop(wtables);
10899
}
109100
None => tracing::error!(
110101
"{} Declare subscriber {} for unknown scope {}!",
@@ -157,18 +148,6 @@ pub(crate) fn undeclare_subscription(
157148
{
158149
tracing::debug!("{} Undeclare subscriber {} ({})", face, id, res.expr());
159150
disable_matches_data_routes(&mut wtables, &mut res);
160-
drop(wtables);
161-
162-
let rtables = zread!(tables.tables);
163-
let matches_data_routes = compute_matches_data_routes(&rtables, &res);
164-
drop(rtables);
165-
166-
let wtables = zwrite!(tables.tables);
167-
for (mut res, data_routes) in matches_data_routes {
168-
get_mut_unchecked(&mut res)
169-
.context_mut()
170-
.update_data_routes(data_routes);
171-
}
172151
Resource::clean(&mut res);
173152
drop(wtables);
174153
} else {
@@ -177,67 +156,17 @@ pub(crate) fn undeclare_subscription(
177156
}
178157
}
179158

180-
pub(crate) fn compute_data_routes(tables: &Tables, expr: &mut RoutingExpr) -> DataRoutes {
181-
let mut routes = DataRoutes::default();
182-
tables
183-
.hat_code
184-
.compute_data_routes(tables, &mut routes, expr);
185-
routes
186-
}
187-
188-
pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc<Resource>) {
189-
if res.context.is_some() && !res.expr().contains('*') && res.has_subs() {
190-
let mut res_mut = res.clone();
191-
let res_mut = get_mut_unchecked(&mut res_mut);
192-
tables.hat_code.compute_data_routes(
193-
tables,
194-
&mut res_mut.context_mut().data_routes,
195-
&mut RoutingExpr::new(res, ""),
196-
);
197-
res_mut.context_mut().valid_data_routes = true;
198-
}
199-
}
200-
201-
pub(crate) fn update_data_routes_from(tables: &mut Tables, res: &mut Arc<Resource>) {
202-
update_data_routes(tables, res);
203-
let res = get_mut_unchecked(res);
204-
for child in res.children.values_mut() {
205-
update_data_routes_from(tables, child);
206-
}
207-
}
208-
209-
pub(crate) fn compute_matches_data_routes<'a>(
210-
tables: &'a Tables,
211-
res: &'a Arc<Resource>,
212-
) -> Vec<(Arc<Resource>, DataRoutes)> {
213-
let mut routes = vec![];
214-
if res.context.is_some() {
215-
if !res.expr().contains('*') && res.has_subs() {
216-
let mut expr = RoutingExpr::new(res, "");
217-
routes.push((res.clone(), compute_data_routes(tables, &mut expr)));
218-
}
219-
for match_ in &res.context().matches {
220-
let match_ = match_.upgrade().unwrap();
221-
if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_subs() {
222-
let mut expr = RoutingExpr::new(&match_, "");
223-
let match_routes = compute_data_routes(tables, &mut expr);
224-
routes.push((match_, match_routes));
225-
}
159+
pub(crate) fn disable_all_data_routes(tables: &mut Tables) {
160+
pub(crate) fn disable_all_data_routes_rec(res: &mut Arc<Resource>) {
161+
let res = get_mut_unchecked(res);
162+
if let Some(ctx) = &mut res.context {
163+
ctx.disable_data_routes();
226164
}
227-
}
228-
routes
229-
}
230-
231-
pub(crate) fn update_matches_data_routes<'a>(tables: &'a mut Tables, res: &'a mut Arc<Resource>) {
232-
if res.context.is_some() {
233-
update_data_routes(tables, res);
234-
for match_ in &res.context().matches {
235-
let mut match_ = match_.upgrade().unwrap();
236-
if !Arc::ptr_eq(&match_, res) {
237-
update_data_routes(tables, &mut match_);
238-
}
165+
for child in res.children.values_mut() {
166+
disable_all_data_routes_rec(child);
239167
}
240168
}
169+
disable_all_data_routes_rec(&mut tables.root_res)
241170
}
242171

243172
pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
@@ -298,16 +227,17 @@ fn get_data_route(
298227
expr: &mut RoutingExpr,
299228
routing_context: NodeId,
300229
) -> Arc<Route> {
301-
let local_context = tables
302-
.hat_code
303-
.map_routing_context(tables, face, routing_context);
304-
res.as_ref()
305-
.and_then(|res| res.data_route(face.whatami, local_context))
306-
.unwrap_or_else(|| {
307-
tables
308-
.hat_code
309-
.compute_data_route(tables, expr, local_context, face.whatami)
310-
})
230+
let hat = &tables.hat_code;
231+
let local_context = hat.map_routing_context(tables, face, routing_context);
232+
let mut compute_route = || hat.compute_data_route(tables, expr, local_context, face.whatami);
233+
if let Some(data_routes) = res
234+
.as_ref()
235+
.and_then(|res| res.context.as_ref())
236+
.map(|ctx| &ctx.data_routes)
237+
{
238+
return get_or_set_route(data_routes, face.whatami, local_context, compute_route);
239+
}
240+
compute_route()
311241
}
312242

313243
#[zenoh_macros::unstable]

zenoh/src/net/routing/dispatcher/queries.rs

+29-97
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,15 @@ use zenoh_util::Timed;
3939

4040
use super::{
4141
face::FaceState,
42-
resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource},
42+
resource::{QueryRoute, QueryTargetQablSet, Resource},
4343
tables::{NodeId, RoutingExpr, Tables, TablesLock},
4444
};
4545
#[cfg(feature = "unstable")]
4646
use crate::key_expr::KeyExpr;
47-
use crate::net::routing::hat::{HatTrait, SendDeclare};
47+
use crate::net::routing::{
48+
hat::{HatTrait, SendDeclare},
49+
router::get_or_set_route,
50+
};
4851

4952
pub(crate) struct Query {
5053
src_face: Arc<FaceState>,
@@ -120,18 +123,6 @@ pub(crate) fn declare_queryable(
120123

121124
disable_matches_query_routes(&mut wtables, &mut res);
122125
drop(wtables);
123-
124-
let rtables = zread!(tables.tables);
125-
let matches_query_routes = compute_matches_query_routes(&rtables, &res);
126-
drop(rtables);
127-
128-
let wtables = zwrite!(tables.tables);
129-
for (mut res, query_routes) in matches_query_routes {
130-
get_mut_unchecked(&mut res)
131-
.context_mut()
132-
.update_query_routes(query_routes);
133-
}
134-
drop(wtables);
135126
}
136127
None => tracing::error!(
137128
"{} Declare queryable {} for unknown scope {}!",
@@ -184,18 +175,6 @@ pub(crate) fn undeclare_queryable(
184175
{
185176
tracing::debug!("{} Undeclare queryable {} ({})", face, id, res.expr());
186177
disable_matches_query_routes(&mut wtables, &mut res);
187-
drop(wtables);
188-
189-
let rtables = zread!(tables.tables);
190-
let matches_query_routes = compute_matches_query_routes(&rtables, &res);
191-
drop(rtables);
192-
193-
let wtables = zwrite!(tables.tables);
194-
for (mut res, query_routes) in matches_query_routes {
195-
get_mut_unchecked(&mut res)
196-
.context_mut()
197-
.update_query_routes(query_routes);
198-
}
199178
Resource::clean(&mut res);
200179
drop(wtables);
201180
} else {
@@ -204,67 +183,6 @@ pub(crate) fn undeclare_queryable(
204183
}
205184
}
206185

207-
pub(crate) fn compute_query_routes(tables: &Tables, res: &Arc<Resource>) -> QueryRoutes {
208-
let mut routes = QueryRoutes::default();
209-
tables
210-
.hat_code
211-
.compute_query_routes(tables, &mut routes, &mut RoutingExpr::new(res, ""));
212-
routes
213-
}
214-
215-
pub(crate) fn update_query_routes(tables: &Tables, res: &Arc<Resource>) {
216-
if res.context.is_some() && !res.expr().contains('*') && res.has_qabls() {
217-
let mut res_mut = res.clone();
218-
let res_mut = get_mut_unchecked(&mut res_mut);
219-
tables.hat_code.compute_query_routes(
220-
tables,
221-
&mut res_mut.context_mut().query_routes,
222-
&mut RoutingExpr::new(res, ""),
223-
);
224-
res_mut.context_mut().valid_query_routes = true;
225-
}
226-
}
227-
228-
pub(crate) fn update_query_routes_from(tables: &mut Tables, res: &mut Arc<Resource>) {
229-
update_query_routes(tables, res);
230-
let res = get_mut_unchecked(res);
231-
for child in res.children.values_mut() {
232-
update_query_routes_from(tables, child);
233-
}
234-
}
235-
236-
pub(crate) fn compute_matches_query_routes(
237-
tables: &Tables,
238-
res: &Arc<Resource>,
239-
) -> Vec<(Arc<Resource>, QueryRoutes)> {
240-
let mut routes = vec![];
241-
if res.context.is_some() {
242-
if !res.expr().contains('*') && res.has_qabls() {
243-
routes.push((res.clone(), compute_query_routes(tables, res)));
244-
}
245-
for match_ in &res.context().matches {
246-
let match_ = match_.upgrade().unwrap();
247-
if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_qabls() {
248-
let match_routes = compute_query_routes(tables, &match_);
249-
routes.push((match_, match_routes));
250-
}
251-
}
252-
}
253-
routes
254-
}
255-
256-
pub(crate) fn update_matches_query_routes(tables: &Tables, res: &Arc<Resource>) {
257-
if res.context.is_some() {
258-
update_query_routes(tables, res);
259-
for match_ in &res.context().matches {
260-
let match_ = match_.upgrade().unwrap();
261-
if !Arc::ptr_eq(&match_, res) {
262-
update_query_routes(tables, &match_);
263-
}
264-
}
265-
}
266-
}
267-
268186
#[inline]
269187
fn insert_pending_query(outface: &mut Arc<FaceState>, query: Arc<Query>) -> RequestId {
270188
let outface_mut = get_mut_unchecked(outface);
@@ -417,6 +335,19 @@ impl Timed for QueryCleanup {
417335
}
418336
}
419337

338+
pub(crate) fn disable_all_query_routes(tables: &mut Tables) {
339+
pub(crate) fn disable_all_query_routes_rec(res: &mut Arc<Resource>) {
340+
let res = get_mut_unchecked(res);
341+
if let Some(ctx) = &mut res.context {
342+
ctx.disable_query_routes();
343+
}
344+
for child in res.children.values_mut() {
345+
disable_all_query_routes_rec(child);
346+
}
347+
}
348+
disable_all_query_routes_rec(&mut tables.root_res)
349+
}
350+
420351
pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
421352
if res.context.is_some() {
422353
get_mut_unchecked(res).context_mut().disable_query_routes();
@@ -439,16 +370,17 @@ fn get_query_route(
439370
expr: &mut RoutingExpr,
440371
routing_context: NodeId,
441372
) -> Arc<QueryTargetQablSet> {
442-
let local_context = tables
443-
.hat_code
444-
.map_routing_context(tables, face, routing_context);
445-
res.as_ref()
446-
.and_then(|res| res.query_route(face.whatami, local_context))
447-
.unwrap_or_else(|| {
448-
tables
449-
.hat_code
450-
.compute_query_route(tables, expr, local_context, face.whatami)
451-
})
373+
let hat = &tables.hat_code;
374+
let local_context = hat.map_routing_context(tables, face, routing_context);
375+
let mut compute_route = || hat.compute_query_route(tables, expr, local_context, face.whatami);
376+
if let Some(query_routes) = res
377+
.as_ref()
378+
.and_then(|res| res.context.as_ref())
379+
.map(|ctx| &ctx.query_routes)
380+
{
381+
return get_or_set_route(query_routes, face.whatami, local_context, compute_route);
382+
}
383+
compute_route()
452384
}
453385

454386
#[cfg(feature = "stats")]

0 commit comments

Comments
 (0)