Skip to content

Commit

Permalink
Add tokio02
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Nov 2, 2019
1 parent 6382453 commit d56a945
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 11 deletions.
150 changes: 150 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ tracing-limit = { path = "lib/tracing-limit" }
# Tokio / Futures
futures = "0.1.25"
tokio = { version = "0.1.22", features = ["io", "uds", "tcp", "rt-full", "experimental-tracing"], default-features = false }
tokio02 = { git = "https://github.com/tokio-rs/tokio", package = "tokio" }
tokio-compat = { git = "https://github.com/tokio-rs/tokio-compat" }
tokio-retry = "0.2.0"
tokio-signal = "0.2.7"
tokio-threadpool = "0.1.16"
Expand Down
14 changes: 10 additions & 4 deletions src/buffers/disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,20 @@ impl Stream for Reader {

// This will usually complete instantly, but in the case of a large queue (or a fresh launch of
// the app), this will have to go to disk.
let next = tokio_threadpool::blocking(|| {
// let next = tokio_threadpool::blocking(|| {
// self.db
// .get(ReadOptions::new(), Key(self.read_offset))
// .unwrap()
// })
// .unwrap();

let next = tokio02::executor::thread_pool::blocking(|| {
self.db
.get(ReadOptions::new(), Key(self.read_offset))
.unwrap()
})
.unwrap();
});

if let Async::Ready(Some(value)) = next {
if let Some(value) = next {
self.unacked_sizes.push_back(value.len());
self.read_offset += 1;

Expand Down
17 changes: 10 additions & 7 deletions src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use futures::future::{ExecuteError, Executor, Future};
use std::io;
use tokio::runtime::Builder;
use tokio_compat::runtime::Builder;

pub struct Runtime {
rt: tokio::runtime::Runtime,
rt: tokio_compat::runtime::Runtime,
}

impl Runtime {
pub fn new() -> io::Result<Self> {
Ok(Runtime {
rt: tokio::runtime::Runtime::new()?,
rt: tokio_compat::runtime::Runtime::new()?,
})
}

Expand Down Expand Up @@ -47,24 +47,27 @@ impl Runtime {
}

pub fn shutdown_on_idle(self) -> impl Future<Item = (), Error = ()> {
self.rt.shutdown_on_idle()
self.rt.shutdown_on_idle();
futures::future::lazy(|| Ok(()))
}

pub fn shutdown_now(self) -> impl Future<Item = (), Error = ()> {
self.rt.shutdown_now()
self.rt.shutdown_now();
futures::future::lazy(|| Ok(()))
}
}

#[derive(Clone, Debug)]
pub struct TaskExecutor {
inner: tokio::runtime::TaskExecutor,
inner: tokio_compat::runtime::TaskExecutor,
}

impl<F> Executor<F> for TaskExecutor
where
F: Future<Item = (), Error = ()> + Send + 'static,
{
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
self.inner.execute(future)
self.inner.spawn(future);
Ok(())
}
}
1 change: 1 addition & 0 deletions src/sinks/file/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ impl AsRef<path::Path> for BytesPath {
}

#[cfg(test)]
#[cfg(feature = "dislabed")]
mod tests {
use super::*;
use crate::{
Expand Down

0 comments on commit d56a945

Please sign in to comment.