Skip to content

Commit 70482ab

Browse files
Michael-J-WardLucioFranco
authored andcommitted
enhancement(operations): abstracts runtime into runtime.rs (#1098)
* Initial runtime::Runtime wrapper * Replaces tokio::Runtime with wrapper #1085 * Add TaskExecutor Signed-off-by: Lucio Franco <luciofranco14@gmail.com>
1 parent 67ee5cc commit 70482ab

23 files changed

+170
-98
lines changed

benches/bench.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use vector::test_util::{
1010
};
1111
use vector::topology::config::TransformConfig;
1212
use vector::topology::{self, config};
13-
use vector::{sinks, sources, transforms};
13+
use vector::{runtime, sinks, sources, transforms};
1414

1515
mod batch;
1616
mod buffering;
@@ -61,7 +61,7 @@ fn benchmark_simple_pipe(c: &mut Criterion) {
6161
sinks::tcp::TcpSinkConfig::new(out_addr.to_string()),
6262
);
6363

64-
let mut rt = tokio::runtime::Runtime::new().unwrap();
64+
let mut rt = runtime::Runtime::new().unwrap();
6565

6666
let output_lines = count_receive(&out_addr);
6767

@@ -107,7 +107,7 @@ fn benchmark_simple_pipe_with_tiny_lines(c: &mut Criterion) {
107107
sinks::tcp::TcpSinkConfig::new(out_addr.to_string()),
108108
);
109109

110-
let mut rt = tokio::runtime::Runtime::new().unwrap();
110+
let mut rt = runtime::Runtime::new().unwrap();
111111

112112
let output_lines = count_receive(&out_addr);
113113

@@ -153,7 +153,7 @@ fn benchmark_simple_pipe_with_huge_lines(c: &mut Criterion) {
153153
sinks::tcp::TcpSinkConfig::new(out_addr.to_string()),
154154
);
155155

156-
let mut rt = tokio::runtime::Runtime::new().unwrap();
156+
let mut rt = runtime::Runtime::new().unwrap();
157157

158158
let output_lines = count_receive(&out_addr);
159159

@@ -200,7 +200,7 @@ fn benchmark_simple_pipe_with_many_writers(c: &mut Criterion) {
200200
sinks::tcp::TcpSinkConfig::new(out_addr.to_string()),
201201
);
202202

203-
let mut rt = tokio::runtime::Runtime::new().unwrap();
203+
let mut rt = runtime::Runtime::new().unwrap();
204204

205205
let output_lines = count_receive(&out_addr);
206206

@@ -264,7 +264,7 @@ fn benchmark_interconnected(c: &mut Criterion) {
264264
sinks::tcp::TcpSinkConfig::new(out_addr2.to_string()),
265265
);
266266

267-
let mut rt = tokio::runtime::Runtime::new().unwrap();
267+
let mut rt = runtime::Runtime::new().unwrap();
268268

269269
let output_lines1 = count_receive(&out_addr1);
270270
let output_lines2 = count_receive(&out_addr2);
@@ -331,7 +331,7 @@ fn benchmark_transforms(c: &mut Criterion) {
331331
&["filter"],
332332
sinks::tcp::TcpSinkConfig::new(out_addr.to_string()),
333333
);
334-
let mut rt = tokio::runtime::Runtime::new().unwrap();
334+
let mut rt = runtime::Runtime::new().unwrap();
335335

336336
let output_lines = count_receive(&out_addr);
337337

@@ -485,7 +485,7 @@ fn benchmark_complex(c: &mut Criterion) {
485485
&["filter_500"],
486486
sinks::tcp::TcpSinkConfig::new(out_addr_500.to_string()),
487487
);
488-
let mut rt = tokio::runtime::Runtime::new().unwrap();
488+
let mut rt = runtime::Runtime::new().unwrap();
489489

490490
let output_lines_all = count_receive(&out_addr_all);
491491
let output_lines_sampled = count_receive(&out_addr_sampled);

benches/buffering.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use vector::test_util::{
55
block_on, count_receive, next_addr, send_lines, shutdown_on_idle, wait_for_tcp,
66
};
77
use vector::topology::{self, config};
8-
use vector::{buffers::BufferConfig, sinks, sources};
8+
use vector::{buffers::BufferConfig, runtime, sinks, sources};
99

1010
fn benchmark_buffers(c: &mut Criterion) {
1111
let num_lines: usize = 100_000;
@@ -35,7 +35,7 @@ fn benchmark_buffers(c: &mut Criterion) {
3535
when_full: Default::default(),
3636
};
3737

38-
let mut rt = tokio::runtime::Runtime::new().unwrap();
38+
let mut rt = runtime::Runtime::new().unwrap();
3939

4040
let output_lines = count_receive(&out_addr);
4141

@@ -72,7 +72,7 @@ fn benchmark_buffers(c: &mut Criterion) {
7272
.into();
7373
config.global.data_dir = Some(data_dir.clone());
7474

75-
let mut rt = tokio::runtime::Runtime::new().unwrap();
75+
let mut rt = runtime::Runtime::new().unwrap();
7676

7777
let output_lines = count_receive(&out_addr);
7878

@@ -108,7 +108,7 @@ fn benchmark_buffers(c: &mut Criterion) {
108108
};
109109
config.global.data_dir = Some(data_dir2.clone());
110110

111-
let mut rt = tokio::runtime::Runtime::new().unwrap();
111+
let mut rt = runtime::Runtime::new().unwrap();
112112

113113
let output_lines = count_receive(&out_addr);
114114

benches/files.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::codec::{BytesCodec, FramedWrite};
77
use tokio::fs::OpenOptions;
88
use vector::test_util::random_lines;
99
use vector::{
10-
sinks, sources,
10+
runtime, sinks, sources,
1111
topology::{self, config},
1212
};
1313

@@ -54,7 +54,7 @@ fn benchmark_files_without_partitions(c: &mut Criterion) {
5454
},
5555
);
5656

57-
let mut rt = tokio::runtime::Runtime::new().unwrap();
57+
let mut rt = runtime::Runtime::new().unwrap();
5858
let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
5959

6060
let mut options = OpenOptions::new();

benches/http.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use hyper::{Body, Response, Server};
55
use std::net::SocketAddr;
66
use vector::test_util::{next_addr, random_lines, send_lines, wait_for_tcp};
77
use vector::{
8-
sinks, sources,
8+
runtime, sinks, sources,
99
topology::{self, config},
1010
};
1111

@@ -33,7 +33,7 @@ fn benchmark_http_no_compression(c: &mut Criterion) {
3333
},
3434
);
3535

36-
let mut rt = tokio::runtime::Runtime::new().unwrap();
36+
let mut rt = runtime::Runtime::new().unwrap();
3737

3838
let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
3939
wait_for_tcp(in_addr);
@@ -80,7 +80,7 @@ fn benchmark_http_gzip(c: &mut Criterion) {
8080
},
8181
);
8282

83-
let mut rt = tokio::runtime::Runtime::new().unwrap();
83+
let mut rt = runtime::Runtime::new().unwrap();
8484

8585
let (topology, _crash) = topology::start(config, &mut rt, false).unwrap();
8686
wait_for_tcp(in_addr);

lib/tracing-limit/benches/limit.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ fn bench(c: &mut Criterion) {
4141
c.bench_function_over_inputs(
4242
"Limit 5 seconds",
4343
|b, n| {
44-
let sub = VisitingSubscriber(Mutex::new(String::from(""))).with(Limit::default());;
44+
let sub = VisitingSubscriber(Mutex::new(String::from(""))).with(Limit::default());
4545
let n = black_box(n);
4646
tracing::subscriber::with_default(sub, || {
4747
b.iter(|| {

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ pub mod buffers;
1515
pub mod event;
1616
pub mod metrics;
1717
pub mod region;
18+
pub mod runtime;
1819
pub mod sinks;
1920
pub mod sources;
2021
pub mod template;

src/main.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use structopt::StructOpt;
1212
use tokio_signal::unix::{Signal, SIGHUP, SIGINT, SIGQUIT, SIGTERM};
1313
use topology::Config;
1414
use tracing_futures::Instrument;
15-
use vector::{metrics, topology, trace};
15+
use vector::{metrics, runtime, topology, trace};
1616

1717
#[derive(StructOpt, Debug)]
1818
#[structopt(rename_all = "kebab-case")]
@@ -206,12 +206,9 @@ fn main() {
206206
});
207207

208208
let mut rt = {
209-
let mut builder = tokio::runtime::Builder::new();
210-
211209
let threads = opts.threads.unwrap_or(max(1, num_cpus::get()));
212-
builder.core_threads(min(4, threads));
213-
214-
builder.build().expect("Unable to create async runtime")
210+
let num_threads = min(4, threads);
211+
runtime::Runtime::with_thread_count(num_threads).expect("Unable to create async runtime")
215212
};
216213

217214
let (metrics_trigger, metrics_tripwire) = stream_cancel::Tripwire::new();

src/runtime.rs

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
use futures::future::{ExecuteError, Executor, Future};
2+
use std::io;
3+
use tokio::runtime::Builder;
4+
5+
pub struct Runtime {
6+
rt: tokio::runtime::Runtime,
7+
}
8+
9+
impl Runtime {
10+
pub fn new() -> io::Result<Self> {
11+
Ok(Runtime {
12+
rt: tokio::runtime::Runtime::new()?,
13+
})
14+
}
15+
16+
pub fn single_threaded() -> io::Result<Self> {
17+
Self::with_thread_count(1)
18+
}
19+
20+
pub fn with_thread_count(number: usize) -> io::Result<Self> {
21+
Ok(Runtime {
22+
rt: Builder::new().core_threads(number).build()?,
23+
})
24+
}
25+
26+
pub fn spawn<F>(&mut self, future: F) -> &mut Self
27+
where
28+
F: Future<Item = (), Error = ()> + Send + 'static,
29+
{
30+
self.rt.spawn(future);
31+
self
32+
}
33+
34+
pub fn executor(&self) -> TaskExecutor {
35+
TaskExecutor {
36+
inner: self.rt.executor(),
37+
}
38+
}
39+
40+
pub fn block_on<F, R, E>(&mut self, future: F) -> Result<R, E>
41+
where
42+
F: Send + 'static + Future<Item = R, Error = E>,
43+
R: Send + 'static,
44+
E: Send + 'static,
45+
{
46+
self.rt.block_on(future)
47+
}
48+
49+
pub fn shutdown_on_idle(self) -> impl Future<Item = (), Error = ()> {
50+
self.rt.shutdown_on_idle()
51+
}
52+
53+
pub fn shutdown_now(self) -> impl Future<Item = (), Error = ()> {
54+
self.rt.shutdown_now()
55+
}
56+
}
57+
58+
#[derive(Clone, Debug)]
59+
pub struct TaskExecutor {
60+
inner: tokio::runtime::TaskExecutor,
61+
}
62+
63+
impl<F> Executor<F> for TaskExecutor
64+
where
65+
F: Future<Item = (), Error = ()> + Send + 'static,
66+
{
67+
fn execute(&self, future: F) -> Result<(), ExecuteError<F>> {
68+
self.inner.execute(future)
69+
}
70+
}

src/sinks/aws_kinesis_streams.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -330,13 +330,13 @@ mod integration_tests {
330330
use crate::{
331331
buffers::Acker,
332332
region::RegionOrEndpoint,
333+
runtime,
333334
test_util::{random_lines_with_stream, random_string},
334335
};
335336
use futures::{Future, Sink};
336337
use rusoto_core::Region;
337338
use rusoto_kinesis::{Kinesis, KinesisClient};
338339
use std::sync::Arc;
339-
use tokio::runtime::Runtime;
340340

341341
#[test]
342342
fn kinesis_put_records() {
@@ -356,7 +356,7 @@ mod integration_tests {
356356
..Default::default()
357357
};
358358

359-
let mut rt = Runtime::new().unwrap();
359+
let mut rt = runtime::Runtime::new().unwrap();
360360

361361
let sink = KinesisService::new(config, Acker::Null).unwrap();
362362

src/sinks/aws_s3.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ mod integration_tests {
400400
assert_downcast_matches,
401401
event::Event,
402402
region::RegionOrEndpoint,
403+
runtime::Runtime,
403404
sinks::aws_s3::{S3Sink, S3SinkConfig},
404405
test_util::{block_on, random_lines_with_stream, random_string},
405406
};
@@ -502,7 +503,7 @@ mod integration_tests {
502503
let (tx, rx) = futures::sync::mpsc::channel(1);
503504
let pump = sink.send_all(rx).map(|_| ()).map_err(|_| ());
504505

505-
let mut rt = tokio::runtime::Runtime::new().unwrap();
506+
let mut rt = Runtime::new().unwrap();
506507
rt.spawn(pump);
507508

508509
let mut tx = tx.wait();
@@ -588,15 +589,15 @@ mod integration_tests {
588589

589590
#[test]
590591
fn s3_healthchecks() {
591-
let mut rt = tokio::runtime::Runtime::new().unwrap();
592+
let mut rt = Runtime::new().unwrap();
592593

593594
let healthcheck = S3Sink::healthcheck(&config()).unwrap();
594595
rt.block_on(healthcheck).unwrap();
595596
}
596597

597598
#[test]
598599
fn s3_healthchecks_invalid_bucket() {
599-
let mut rt = tokio::runtime::Runtime::new().unwrap();
600+
let mut rt = Runtime::new().unwrap();
600601

601602
let config = S3SinkConfig {
602603
bucket: "asdflkjadskdaadsfadf".to_string(),

src/sinks/http.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ mod tests {
293293
use crate::buffers::Acker;
294294
use crate::{
295295
assert_downcast_matches,
296+
runtime::Runtime,
296297
sinks::http::HttpSinkConfig,
297298
test_util::{next_addr, random_lines_with_stream, shutdown_on_idle},
298299
topology::config::SinkConfig,
@@ -387,7 +388,7 @@ mod tests {
387388
let (input_lines, events) = random_lines_with_stream(100, num_lines);
388389
let pump = sink.send_all(events);
389390

390-
let mut rt = tokio::runtime::Runtime::new().unwrap();
391+
let mut rt = Runtime::new().unwrap();
391392
rt.spawn(server);
392393

393394
rt.block_on(pump).unwrap();
@@ -445,7 +446,7 @@ mod tests {
445446
let (input_lines, events) = random_lines_with_stream(100, num_lines);
446447
let pump = sink.send_all(events);
447448

448-
let mut rt = tokio::runtime::Runtime::new().unwrap();
449+
let mut rt = Runtime::new().unwrap();
449450
rt.spawn(server);
450451

451452
rt.block_on(pump).unwrap();
@@ -503,7 +504,7 @@ mod tests {
503504
let (input_lines, events) = random_lines_with_stream(100, num_lines);
504505
let pump = sink.send_all(events);
505506

506-
let mut rt = tokio::runtime::Runtime::new().unwrap();
507+
let mut rt = Runtime::new().unwrap();
507508
rt.spawn(server);
508509

509510
rt.block_on(pump).unwrap();

src/sinks/util/http.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ mod test {
224224
.serve(new_service)
225225
.map_err(|e| eprintln!("server error: {}", e));
226226

227-
let mut rt = tokio::runtime::Runtime::new().unwrap();
227+
let mut rt = crate::runtime::Runtime::new().unwrap();
228228

229229
rt.spawn(server);
230230

src/sinks/util/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -219,10 +219,10 @@ where
219219
mod test {
220220
use super::BatchServiceSink;
221221
use crate::buffers::Acker;
222+
use crate::runtime::Runtime;
222223
use crate::test_util::wait_for;
223224
use futures::{stream, sync::oneshot, Future, Poll, Sink};
224225
use std::sync::{atomic::Ordering, Arc, Mutex};
225-
use tokio::runtime::Runtime;
226226
use tower::Service;
227227

228228
struct FakeService {

0 commit comments

Comments
 (0)