Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip, allow to run tokio based futures in the bastion executor #299

Merged
merged 14 commits into from
Feb 1, 2021
Merged
8 changes: 6 additions & 2 deletions src/bastion-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ maintenance = { status = "actively-developed" }

[features]
unstable = []
tokio-runtime = ["tokio"]

[dependencies]
lightproc = "0.3.5"
# lightproc = "0.3.5"
bastion-utils = "0.3.2"
# lightproc = { path = "../lightproc" }
lightproc = { path = "../lightproc" }
# bastion-utils = { path = "../bastion-utils" }

crossbeam-utils = "0.8"
Expand All @@ -49,6 +50,9 @@ lever = "0.1"
tracing = "0.1.19"
crossbeam-queue = "0.3.0"

# Feature tokio
tokio = {version = "1.1", features = ["rt", "rt-multi-thread"], optional = true }

[target.'cfg(target_os = "windows")'.dependencies]
winapi = { version = "^0.3.8", features = ["basetsd"] }

Expand Down
53 changes: 45 additions & 8 deletions src/bastion-executor/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,20 @@ where
handle
}

struct BlockingRunner {}
struct BlockingRunner {
// We keep a handle to the tokio runtime here to make sure
// it will never be dropped while the DynamicPoolManager is alive,
// In case we need to spin up some threads.
#[cfg(feature = "tokio-runtime")]
runtime_handle: tokio::runtime::Handle,
}

impl DynamicRunner for BlockingRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("static thread: running task");
task.run();
self.run(task);
}

trace!("static: empty queue, parking with timeout");
Expand All @@ -55,7 +61,7 @@ impl DynamicRunner for BlockingRunner {
loop {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
trace!("dynamic thread: running task");
task.run();
self.run(task);
}
trace!(
"dynamic thread: parking - {:?}",
Expand All @@ -66,11 +72,25 @@ impl DynamicRunner for BlockingRunner {
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.recv_timeout(THREAD_RECV_TIMEOUT) {
task.run();
self.run(task);
}
trace!("standalone thread: quitting.");
}
}

impl BlockingRunner {
fn run(&self, task: LightProc) {
#[cfg(feature = "tokio-runtime")]
{
self.runtime_handle.spawn_blocking(|| task.run());
}
#[cfg(not(feature = "tokio-runtime"))]
{
task.run();
}
}
}

/// Pool interface between the scheduler and thread pool
struct Pool {
sender: Sender<LightProc>,
Expand All @@ -80,11 +100,28 @@ struct Pool {
static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager> = OnceCell::new();

static POOL: Lazy<Pool> = Lazy::new(|| {
let runner = Arc::new(BlockingRunner {});
#[cfg(feature = "tokio-runtime")]
{
let runner = Arc::new(BlockingRunner {
// We use current() here instead of try_current()
// because we want bastion to crash as soon as possible
// if there is no available runtime.
runtime_handle: tokio::runtime::Handle::current(),
});

DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
#[cfg(not(feature = "tokio-runtime"))]
{
let runner = Arc::new(BlockingRunner {});

DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}

DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
Expand Down
65 changes: 57 additions & 8 deletions src/bastion-executor/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@ use tracing::trace;
/// use bastion_executor::prelude::*;
/// use lightproc::prelude::*;
///
/// # #[cfg(feature = "tokio-runtime")]
/// # #[tokio::main]
/// # async fn main() {
/// # start();
/// # }
/// #
/// # #[cfg(not(feature = "tokio-runtime"))]
/// # fn main() {
/// # start();
/// # }
/// #
/// # fn start() {
/// let pid = 1;
/// let stack = ProcStack::default().with_pid(pid);
///
Expand All @@ -43,6 +55,7 @@ use tracing::trace;
/// },
/// stack.clone(),
/// );
/// # }
/// ```
pub fn spawn<F, T>(future: F, stack: ProcStack) -> RecoverableHandle<T>
where
Expand Down Expand Up @@ -134,14 +147,20 @@ pub struct Pool {
receiver: Receiver<LightProc>,
}

struct AsyncRunner {}
struct AsyncRunner {
// We keep a handle to the tokio runtime here to make sure
// it will never be dropped while the DynamicPoolManager is alive,
// In case we need to spin up some threads.
#[cfg(feature = "tokio-runtime")]
runtime_handle: tokio::runtime::Handle,
}

impl DynamicRunner for AsyncRunner {
fn run_static(&self, park_timeout: Duration) -> ! {
loop {
for task in &POOL.receiver {
trace!("static: running task");
task.run();
self.run(task);
}

trace!("static: empty queue, parking with timeout");
Expand All @@ -152,7 +171,7 @@ impl DynamicRunner for AsyncRunner {
loop {
while let Ok(task) = POOL.receiver.try_recv() {
trace!("dynamic thread: running task");
task.run();
self.run(task);
}
trace!(
"dynamic thread: parking - {:?}",
Expand All @@ -163,20 +182,50 @@ impl DynamicRunner for AsyncRunner {
}
fn run_standalone(&self) {
while let Ok(task) = POOL.receiver.try_recv() {
task.run();
self.run(task);
}
trace!("standalone thread: quitting.");
}
}

impl AsyncRunner {
fn run(&self, task: LightProc) {
#[cfg(feature = "tokio-runtime")]
{
self.runtime_handle.spawn_blocking(|| task.run());
}
#[cfg(not(feature = "tokio-runtime"))]
{
task.run();
}
}
}

static DYNAMIC_POOL_MANAGER: OnceCell<DynamicPoolManager> = OnceCell::new();

static POOL: Lazy<Pool> = Lazy::new(|| {
let runner = Arc::new(AsyncRunner {});
#[cfg(feature = "tokio-runtime")]
{
let runner = Arc::new(AsyncRunner {
// We use current() here instead of try_current()
// because we want bastion to crash as soon as possible
// if there is no available runtime.
runtime_handle: tokio::runtime::Handle::current(),
});

DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}
#[cfg(not(feature = "tokio-runtime"))]
{
let runner = Arc::new(AsyncRunner {});

DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
}

DYNAMIC_POOL_MANAGER
.set(DynamicPoolManager::new(*low_watermark() as usize, runner))
.expect("couldn't create dynamic pool manager");
DYNAMIC_POOL_MANAGER
.get()
.expect("couldn't get static pool manager")
Expand Down
2 changes: 1 addition & 1 deletion src/bastion-executor/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use crossbeam_utils::sync::Parker;
use lightproc::proc_stack::ProcStack;
use std::cell::{Cell, UnsafeCell};
use std::future::Future;
use std::mem;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use std::mem;

///
/// This method blocks the current thread until passed future is resolved with an output (including the panic).
Expand Down
3 changes: 1 addition & 2 deletions src/bastion-executor/src/thread_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,7 @@ impl DynamicPoolManager {
.name("bastion-driver-dynamic".to_string())
.spawn(move || {
Self::affinity_pinner();
let parker = || self.park_thread();
clone.run_dynamic(&parker);
clone.run_dynamic(&|| self.park_thread());
})
.expect("cannot start dynamic thread");
});
Expand Down
17 changes: 14 additions & 3 deletions src/bastion-executor/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@ mod tests {
dbg!(core_ids);
}

#[test]
fn pool_check() {
pool::get();
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
#[tokio::test]
async fn pool_check() {
super::pool::get();
}
}

#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_tests {
#[test]
fn pool_check() {
super::pool::get();
}
}
}
19 changes: 17 additions & 2 deletions src/bastion-executor/tests/run_blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,23 @@ use lightproc::proc_stack::ProcStack;
use std::thread;
use std::time::Duration;

#[test]
fn test_run_blocking() {
#[cfg(feature = "tokio-runtime")]
mod tokio_tests {
#[tokio::test]
async fn test_run_blocking() {
super::run_test()
}
}

#[cfg(not(feature = "tokio-runtime"))]
mod no_tokio_tests {
#[test]
fn test_run_blocking() {
super::run_test()
}
}

fn run_test() {
let output = run(
blocking::spawn_blocking(
async {
Expand Down
15 changes: 10 additions & 5 deletions src/bastion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,17 @@ distributed = [
]
scaling = []
docs = ["distributed", "scaling", "default"]

tokio-runtime = ["bastion-executor/tokio-runtime"]

[package.metadata.docs.rs]
features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]

[dependencies]
bastion-executor = "0.4.0"
lightproc = "0.3.5"
# bastion-executor = { version = "= 0.3.7-alpha.0", path = "../bastion-executor" }
# lightproc = { version = "= 0.3.6-alpha.0", path = "../lightproc" }
# bastion-executor = "0.4.0"
# lightproc = "0.3.5"
bastion-executor = { path = "../bastion-executor" }
lightproc = { path = "../lightproc" }

lever = "0.1"
futures = "0.3.5"
Expand All @@ -75,6 +75,7 @@ tracing-subscriber = "0.2.6"
tracing = "0.1.15"
anyhow = "1.0.31"
crossbeam-queue = "0.3.0"
log = "0.4.14"

[target.'cfg(not(windows))'.dependencies]
nuclei = "0.1"
Expand All @@ -88,3 +89,7 @@ bastion-utils = { version = "0.3.2", path = "../bastion-utils" }
rand = "0.7.3"
rayon = "1.3.1"
num_cpus = "1.13.0"
# hello_tokio example
tokio = { version="1.1", features = ["time", "macros"] }
bastion-executor = { path = "../bastion-executor" }
once_cell = "1.5.2"
Loading