Skip to content

Commit fef38dc

Browse files
Replace trees computation tasks with a worker (#1303)
* Replace trees computation tasks with a worker * Address review comments * Remove review comments
1 parent f47354c commit fef38dc

File tree

2 files changed

+86
-70
lines changed

2 files changed

+86
-70
lines changed

zenoh/src/net/routing/hat/linkstate_peer/mod.rs

+35-27
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,42 @@ macro_rules! face_hat_mut {
108108
}
109109
use face_hat_mut;
110110

111+
struct TreesComputationWorker {
112+
_task: TerminatableTask,
113+
tx: flume::Sender<Arc<TablesLock>>,
114+
}
115+
116+
impl TreesComputationWorker {
117+
fn new() -> Self {
118+
let (tx, rx) = flume::bounded::<Arc<TablesLock>>(1);
119+
let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move {
120+
loop {
121+
tokio::time::sleep(std::time::Duration::from_millis(
122+
*TREES_COMPUTATION_DELAY_MS,
123+
))
124+
.await;
125+
if let Ok(tables_ref) = rx.recv_async().await {
126+
let mut tables = zwrite!(tables_ref.tables);
127+
128+
tracing::trace!("Compute trees");
129+
let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees();
130+
131+
tracing::trace!("Compute routes");
132+
pubsub::pubsub_tree_change(&mut tables, &new_children);
133+
queries::queries_tree_change(&mut tables, &new_children);
134+
drop(tables);
135+
}
136+
}
137+
});
138+
Self { _task: task, tx }
139+
}
140+
}
141+
111142
struct HatTables {
112143
peer_subs: HashSet<Arc<Resource>>,
113144
peer_qabls: HashSet<Arc<Resource>>,
114145
peers_net: Option<Network>,
115-
peers_trees_task: Option<TerminatableTask>,
146+
peers_trees_worker: TreesComputationWorker,
116147
}
117148

118149
impl HatTables {
@@ -121,36 +152,13 @@ impl HatTables {
121152
peer_subs: HashSet::new(),
122153
peer_qabls: HashSet::new(),
123154
peers_net: None,
124-
peers_trees_task: None,
155+
peers_trees_worker: TreesComputationWorker::new(),
125156
}
126157
}
127158

128159
fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>) {
129-
tracing::trace!("Schedule computations");
130-
if self.peers_trees_task.is_none() {
131-
let task = TerminatableTask::spawn(
132-
zenoh_runtime::ZRuntime::Net,
133-
async move {
134-
tokio::time::sleep(std::time::Duration::from_millis(
135-
*TREES_COMPUTATION_DELAY_MS,
136-
))
137-
.await;
138-
let mut tables = zwrite!(tables_ref.tables);
139-
140-
tracing::trace!("Compute trees");
141-
let new_children = hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees();
142-
143-
tracing::trace!("Compute routes");
144-
pubsub::pubsub_tree_change(&mut tables, &new_children);
145-
queries::queries_tree_change(&mut tables, &new_children);
146-
147-
tracing::trace!("Computations completed");
148-
hat_mut!(tables).peers_trees_task = None;
149-
},
150-
TerminatableTask::create_cancellation_token(),
151-
);
152-
self.peers_trees_task = Some(task);
153-
}
160+
tracing::trace!("Schedule trees computation");
161+
let _ = self.peers_trees_worker.tx.try_send(tables_ref);
154162
}
155163
}
156164

zenoh/src/net/routing/hat/router/mod.rs

+51-43
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,44 @@ macro_rules! face_hat_mut {
113113
}
114114
use face_hat_mut;
115115

116+
struct TreesComputationWorker {
117+
_task: TerminatableTask,
118+
tx: flume::Sender<Arc<TablesLock>>,
119+
}
120+
121+
impl TreesComputationWorker {
122+
fn new(net_type: WhatAmI) -> Self {
123+
let (tx, rx) = flume::bounded::<Arc<TablesLock>>(1);
124+
let task = TerminatableTask::spawn_abortable(zenoh_runtime::ZRuntime::Net, async move {
125+
loop {
126+
tokio::time::sleep(std::time::Duration::from_millis(
127+
*TREES_COMPUTATION_DELAY_MS,
128+
))
129+
.await;
130+
if let Ok(tables_ref) = rx.recv_async().await {
131+
let mut tables = zwrite!(tables_ref.tables);
132+
133+
tracing::trace!("Compute trees");
134+
let new_children = match net_type {
135+
WhatAmI::Router => hat_mut!(tables)
136+
.routers_net
137+
.as_mut()
138+
.unwrap()
139+
.compute_trees(),
140+
_ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(),
141+
};
142+
143+
tracing::trace!("Compute routes");
144+
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
145+
queries::queries_tree_change(&mut tables, &new_children, net_type);
146+
drop(tables);
147+
}
148+
}
149+
});
150+
Self { _task: task, tx }
151+
}
152+
}
153+
116154
struct HatTables {
117155
router_subs: HashSet<Arc<Resource>>,
118156
peer_subs: HashSet<Arc<Resource>>,
@@ -121,8 +159,8 @@ struct HatTables {
121159
routers_net: Option<Network>,
122160
peers_net: Option<Network>,
123161
shared_nodes: Vec<ZenohId>,
124-
routers_trees_task: Option<TerminatableTask>,
125-
peers_trees_task: Option<TerminatableTask>,
162+
routers_trees_worker: TreesComputationWorker,
163+
peers_trees_worker: TreesComputationWorker,
126164
router_peers_failover_brokering: bool,
127165
}
128166

@@ -136,8 +174,8 @@ impl HatTables {
136174
routers_net: None,
137175
peers_net: None,
138176
shared_nodes: vec![],
139-
routers_trees_task: None,
140-
peers_trees_task: None,
177+
routers_trees_worker: TreesComputationWorker::new(WhatAmI::Router),
178+
peers_trees_worker: TreesComputationWorker::new(WhatAmI::Peer),
141179
router_peers_failover_brokering,
142180
}
143181
}
@@ -240,45 +278,15 @@ impl HatTables {
240278
}
241279

242280
fn schedule_compute_trees(&mut self, tables_ref: Arc<TablesLock>, net_type: WhatAmI) {
243-
tracing::trace!("Schedule computations");
244-
if (net_type == WhatAmI::Router && self.routers_trees_task.is_none())
245-
|| (net_type == WhatAmI::Peer && self.peers_trees_task.is_none())
246-
{
247-
let task = TerminatableTask::spawn(
248-
zenoh_runtime::ZRuntime::Net,
249-
async move {
250-
tokio::time::sleep(std::time::Duration::from_millis(
251-
*TREES_COMPUTATION_DELAY_MS,
252-
))
253-
.await;
254-
let mut tables = zwrite!(tables_ref.tables);
255-
256-
tracing::trace!("Compute trees");
257-
let new_children = match net_type {
258-
WhatAmI::Router => hat_mut!(tables)
259-
.routers_net
260-
.as_mut()
261-
.unwrap()
262-
.compute_trees(),
263-
_ => hat_mut!(tables).peers_net.as_mut().unwrap().compute_trees(),
264-
};
265-
266-
tracing::trace!("Compute routes");
267-
pubsub::pubsub_tree_change(&mut tables, &new_children, net_type);
268-
queries::queries_tree_change(&mut tables, &new_children, net_type);
269-
270-
tracing::trace!("Computations completed");
271-
match net_type {
272-
WhatAmI::Router => hat_mut!(tables).routers_trees_task = None,
273-
_ => hat_mut!(tables).peers_trees_task = None,
274-
};
275-
},
276-
TerminatableTask::create_cancellation_token(),
277-
);
278-
match net_type {
279-
WhatAmI::Router => self.routers_trees_task = Some(task),
280-
_ => self.peers_trees_task = Some(task),
281-
};
281+
tracing::trace!("Schedule trees computation");
282+
match net_type {
283+
WhatAmI::Router => {
284+
let _ = self.routers_trees_worker.tx.try_send(tables_ref);
285+
}
286+
WhatAmI::Peer => {
287+
let _ = self.peers_trees_worker.tx.try_send(tables_ref);
288+
}
289+
_ => (),
282290
}
283291
}
284292
}

0 commit comments

Comments
 (0)