Skip to content

Commit

Permalink
Incorporate the review comments.
Browse files Browse the repository at this point in the history
Signed-off-by: ChenYing Kuo <evshary@gmail.com>
  • Loading branch information
evshary committed Aug 1, 2024
1 parent fa29d34 commit 12cfbce
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 10 deletions.
2 changes: 1 addition & 1 deletion commons/zenoh-core/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ macro_rules! to_u64 {
}

// This macro allows to spawn the right amount of threads in the
// tokio executor
// async_std executor
#[macro_export]
macro_rules! zasync_executor_init {
() => {
Expand Down
21 changes: 17 additions & 4 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! This crate is intended for Zenoh's internal use.
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use std::{borrow::Cow, convert::TryFrom, str::FromStr, sync::Arc};
use std::{borrow::Cow, convert::TryFrom, str::FromStr, sync::Arc, time::Duration};

use base64::Engine;
use futures::StreamExt;
Expand Down Expand Up @@ -51,8 +51,11 @@ lazy_static::lazy_static! {
}
const RAW_KEY: &str = "_raw";

#[cfg(feature = "dynamic_plugin")]
const WORKER_THREAD_NUM: usize = 2;
#[cfg(feature = "dynamic_plugin")]
const MAX_BLOCK_THREAD_NUM: usize = 50;
#[cfg(feature = "dynamic_plugin")]
lazy_static::lazy_static! {
// The global runtime is used in the zenohd case, which we can't get the current runtime
static ref TOKIO_RUNTIME: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
Expand Down Expand Up @@ -258,9 +261,19 @@ impl Plugin for RestPlugin {

let conf: Config = serde_json::from_value(plugin_conf.clone())
.map_err(|e| zerror!("Plugin `{}` configuration error: {}", name, e))?;
let task = TOKIO_RUNTIME.spawn(run(runtime.clone(), conf.clone()));
let task = run(runtime.clone(), conf.clone());

// Reuse the runtime when it comes to static linking or standalone binary
#[cfg(not(feature = "dynamic_plugin"))]
let task = tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { timeout(Duration::from_millis(1), tokio::spawn(task)).await })
});
// Use our own runtime since dynamic linking can't access the current runtime
#[cfg(feature = "dynamic_plugin")]
let task = TOKIO_RUNTIME
.block_on(async { timeout(std::time::Duration::from_millis(1), task).await });
.block_on(async { timeout(Duration::from_millis(1), TOKIO_RUNTIME.spawn(task)).await });

if let Ok(Err(e)) = task {
bail!("REST server failed within 1ms: {e}")
}
Expand Down Expand Up @@ -345,7 +358,7 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
))
}
};
TOKIO_RUNTIME.spawn(async move {
tokio::spawn(async move {
tracing::debug!("Subscribe to {} for SSE stream", key_expr);
let sender = &sender;
let sub = req.state().0.declare_subscriber(&key_expr).await.unwrap();
Expand Down
5 changes: 0 additions & 5 deletions plugins/zenoh-plugin-storage-manager/tests/wildcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,8 @@ async fn test_wild_card_in_order() {
drop(storage);
}

// fn test_wild_card_out_of_order() {
// assert_eq!(true, true);
// }

#[test]
fn wildcard_test() {
let rt = Runtime::new().unwrap();
rt.block_on(async { test_wild_card_in_order().await });
// rt.block_on(async { test_wild_card_out_of_order() });
}

0 comments on commit 12cfbce

Please sign in to comment.