Skip to content

Commit e596cc1

Browse files
authored
chore(topology): Refactor source shutdown and make it two-phase (#1994)
* chore(topology): Refactor source shutdown and make it two-phase (#1091) Phase one is signaling to all sources to shut down, phase 2 is waiting for them finish shutting down gracefully, or forcing shutdown if they don't finish within a time limit (currently 3 seconds) Signed-off-by: Spencer T Brody <spencer.t.brody@gmail.com>
1 parent ccd3cf7 commit e596cc1

27 files changed

+875
-68
lines changed

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ pub mod metrics;
2222
pub mod region;
2323
pub mod runtime;
2424
pub mod serde;
25+
pub mod shutdown;
2526
pub mod sinks;
2627
pub mod sources;
28+
pub mod stream;
2729
pub mod template;
2830
pub mod test_util;
2931
pub mod tls;

src/shutdown.rs

+311
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
use crate::runtime;
2+
use futures01::{future, stream::Stream, Async, Future};
3+
use std::collections::HashMap;
4+
use std::sync::Arc;
5+
use std::time::{Duration, Instant};
6+
use stream_cancel::{Trigger, Tripwire};
7+
use tokio::timer;
8+
9+
/// When this struct goes out of scope and its internal refcount goes to 0 it is a signal that its
10+
/// corresponding Source has completed executing and may be cleaned up. It is the responsibility
11+
/// of each Source to ensure that at least one copy of this handle remains alive for the entire
12+
/// lifetime of the Source.
13+
#[derive(Clone, Debug)]
14+
pub struct ShutdownSignalToken {
15+
_shutdown_complete: Arc<Trigger>,
16+
}
17+
18+
impl ShutdownSignalToken {
19+
fn new(shutdown_complete: Trigger) -> Self {
20+
Self {
21+
_shutdown_complete: Arc::new(shutdown_complete),
22+
}
23+
}
24+
}
25+
26+
/// Passed to each Source to coordinate the global shutdown process.
27+
#[derive(Clone, Debug)]
28+
pub struct ShutdownSignal {
29+
/// This will be triggered when global shutdown has begun, and is a sign to the Source to begin
30+
/// its shutdown process.
31+
begin_shutdown: Tripwire,
32+
33+
/// When a Source allows this to go out of scope it informs the global shutdown coordinator that
34+
/// this Source's local shutdown process is complete.
35+
/// Optional only so that `poll()` can move the handle out and return it.
36+
shutdown_complete: Option<ShutdownSignalToken>,
37+
}
38+
39+
impl Future for ShutdownSignal {
40+
type Item = ShutdownSignalToken;
41+
type Error = ();
42+
fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
43+
match self.begin_shutdown.poll() {
44+
Ok(Async::Ready(_)) => Ok(Async::Ready(self.shutdown_complete.take().unwrap())),
45+
Ok(Async::NotReady) => Ok(Async::NotReady),
46+
Err(_) => Err(()),
47+
}
48+
}
49+
}
50+
51+
impl ShutdownSignal {
52+
pub fn new(begin_shutdown: Tripwire, shutdown_complete: Trigger) -> Self {
53+
Self {
54+
begin_shutdown,
55+
shutdown_complete: Some(ShutdownSignalToken::new(shutdown_complete)),
56+
}
57+
}
58+
59+
#[cfg(test)]
60+
pub fn noop() -> Self {
61+
let (trigger, tripwire) = Tripwire::new();
62+
Self {
63+
begin_shutdown: tripwire,
64+
shutdown_complete: Some(ShutdownSignalToken::new(trigger)),
65+
}
66+
}
67+
}
68+
69+
pub struct ShutdownCoordinator {
70+
shutdown_begun_triggers: HashMap<String, Trigger>,
71+
shutdown_force_triggers: HashMap<String, Trigger>,
72+
shutdown_complete_tripwires: HashMap<String, Tripwire>,
73+
}
74+
75+
impl ShutdownCoordinator {
76+
pub fn new() -> Self {
77+
Self {
78+
shutdown_begun_triggers: HashMap::new(),
79+
shutdown_complete_tripwires: HashMap::new(),
80+
shutdown_force_triggers: HashMap::new(),
81+
}
82+
}
83+
84+
/// Creates the necessary Triggers and Tripwires for coordinating shutdown of this Source and
85+
/// stores them as needed. Returns the ShutdownSignal for this Source as well as a Tripwire
86+
/// that will be notified if the Source should be forcibly shut down.
87+
pub fn register_source(
88+
&mut self,
89+
name: &str,
90+
) -> (ShutdownSignal, impl Future<Item = (), Error = ()>) {
91+
let (shutdown_begun_trigger, shutdown_begun_tripwire) = Tripwire::new();
92+
let (force_shutdown_trigger, force_shutdown_tripwire) = Tripwire::new();
93+
let (shutdown_complete_trigger, shutdown_complete_tripwire) = Tripwire::new();
94+
95+
self.shutdown_begun_triggers
96+
.insert(name.to_string(), shutdown_begun_trigger);
97+
self.shutdown_force_triggers
98+
.insert(name.to_string(), force_shutdown_trigger);
99+
self.shutdown_complete_tripwires
100+
.insert(name.to_string(), shutdown_complete_tripwire);
101+
102+
let shutdown_signal =
103+
ShutdownSignal::new(shutdown_begun_tripwire, shutdown_complete_trigger);
104+
105+
// shutdown_source_end drops the force_shutdown_trigger even on success when we should *not*
106+
// be shutting down. Dropping the trigger will cause the Tripwire to resolve with an error,
107+
// so we use or_else with future::empty() to make it so it never resolves if the Trigger is
108+
// prematurely dropped instead.
109+
let force_shutdown_tripwire = force_shutdown_tripwire.or_else(|_| future::empty());
110+
(shutdown_signal, force_shutdown_tripwire)
111+
}
112+
113+
/// Takes ownership of all internal state for the given source from another ShutdownCoordinator.
114+
pub fn takeover_source(&mut self, name: &str, other: &mut Self) {
115+
let existing = self.shutdown_begun_triggers.insert(
116+
name.to_string(),
117+
other.shutdown_begun_triggers.remove(name).expect(&format!(
118+
"Other ShutdownCoordinator didn't have a shutdown_begun_trigger for {}",
119+
name
120+
)),
121+
);
122+
if !existing.is_none() {
123+
panic!(
124+
"ShutdownCoordinator already has a shutdown_begin_trigger for source {}",
125+
name
126+
);
127+
}
128+
129+
let existing = self.shutdown_force_triggers.insert(
130+
name.to_string(),
131+
other.shutdown_force_triggers.remove(name).expect(&format!(
132+
"Other ShutdownCoordinator didn't have a shutdown_force_trigger for {}",
133+
name
134+
)),
135+
);
136+
if !existing.is_none() {
137+
panic!(
138+
"ShutdownCoordinator already has a shutdown_force_trigger for source {}",
139+
name
140+
);
141+
}
142+
143+
let existing = self.shutdown_complete_tripwires.insert(
144+
name.to_string(),
145+
other
146+
.shutdown_complete_tripwires
147+
.remove(name)
148+
.expect(&format!(
149+
"Other ShutdownCoordinator didn't have a shutdown_complete_tripwire for {}",
150+
name
151+
)),
152+
);
153+
if !existing.is_none() {
154+
panic!(
155+
"ShutdownCoordinator already has a shutdown_complete_tripwire for source {}",
156+
name
157+
);
158+
}
159+
}
160+
161+
pub fn shutdown_source_begin(&mut self, name: &str) {
162+
self.shutdown_begun_triggers
163+
.remove(name)
164+
.expect(&format!(
165+
"shutdown_begun_trigger for source '{}' not found in the ShutdownCoordinator",
166+
name
167+
))
168+
.cancel();
169+
}
170+
171+
/// Waits for the source to shut down until the deadline. If the source does not
172+
/// notify the shutdown_complete_tripwire for this source before the dealine, then signals
173+
/// the shutdown_force_trigger for this source to force it to shut down. Returns whether
174+
/// or not the source shutdown gracefully.
175+
// TODO: The timing and reporting logic is very similar to the logic in
176+
// `RunningTopology::stop()`. Once `RunningTopology::stop()` has been updated to utilize the
177+
// ShutdownCoordinator, see if some of this logic can be de-duped.
178+
pub fn shutdown_source_end(
179+
&mut self,
180+
rt: &mut runtime::Runtime,
181+
name: String,
182+
deadline: Instant,
183+
) -> bool {
184+
let name2 = name.clone();
185+
let name3 = name.clone();
186+
let shutdown_complete_tripwire =
187+
self.shutdown_complete_tripwires
188+
.remove(&name)
189+
.expect(&format!(
190+
"shutdown_complete_tripwire for source '{}' not found in the ShutdownCoordinator",
191+
name
192+
));
193+
let shutdown_force_trigger = self.shutdown_force_triggers.remove(&name).expect(&format!(
194+
"shutdown_force_trigger for source '{}' not found in the ShutdownCoordinator",
195+
name
196+
));
197+
198+
let success = shutdown_complete_tripwire.map(move |_| {
199+
info!("Source \"{}\" shut down successfully", name);
200+
});
201+
let timeout = timer::Delay::new(deadline)
202+
.map(move |_| {
203+
error!(
204+
"Source '{}' failed to shutdown before deadline. Forcing shutdown.",
205+
name2,
206+
);
207+
})
208+
.map_err(|err| panic!("Timer error: {:?}", err));
209+
let reporter = timer::Interval::new_interval(Duration::from_secs(5))
210+
.inspect(move |_| {
211+
let time_remaining = if deadline > Instant::now() {
212+
format!(
213+
"{} seconds remaining",
214+
(deadline - Instant::now()).as_secs()
215+
)
216+
} else {
217+
"overdue".to_string()
218+
};
219+
220+
info!(
221+
"Still waiting on source \"{}\" to shut down. {}",
222+
name3, time_remaining,
223+
);
224+
})
225+
.filter(|_| false) // Run indefinitely without emitting items
226+
.into_future()
227+
.map(|_| ())
228+
.map_err(|(err, _)| panic!("Timer error: {:?}", err));
229+
230+
let union = future::select_all::<Vec<Box<dyn Future<Item = (), Error = ()> + Send>>>(vec![
231+
Box::new(success),
232+
Box::new(timeout),
233+
Box::new(reporter),
234+
]);
235+
236+
// Now block until one of the futures resolves and use the index of the resolved future
237+
// to decide whether it was the success or timeout future that resolved first.
238+
let index = match rt.block_on(union) {
239+
Ok((_, index, _)) => index,
240+
Err((err, _, _)) => panic!(
241+
"Error from select_all future while waiting for source to shut down: {:?}",
242+
err
243+
),
244+
};
245+
246+
let success = if index == 0 {
247+
true
248+
} else if index == 1 {
249+
false
250+
} else {
251+
panic!(
252+
"Neither success nor timeout future finished. Index finished: {}",
253+
index
254+
);
255+
};
256+
if success {
257+
shutdown_force_trigger.disable();
258+
} else {
259+
shutdown_force_trigger.cancel();
260+
}
261+
success
262+
}
263+
}
264+
265+
#[cfg(test)]
266+
mod test {
267+
use crate::runtime;
268+
use crate::shutdown::ShutdownCoordinator;
269+
use futures01::future::Future;
270+
use std::time::{Duration, Instant};
271+
272+
#[test]
273+
fn shutdown_coordinator_shutdown_source_clean() {
274+
let mut rt = runtime::Runtime::new().unwrap();
275+
let mut shutdown = ShutdownCoordinator::new();
276+
let name = "test";
277+
278+
let (shutdown_signal, _) = shutdown.register_source(name);
279+
280+
shutdown.shutdown_source_begin(name);
281+
282+
drop(shutdown_signal);
283+
284+
let deadline = Instant::now() + Duration::from_secs(1);
285+
assert_eq!(
286+
true,
287+
shutdown.shutdown_source_end(&mut rt, name.to_string(), deadline)
288+
);
289+
}
290+
291+
#[test]
292+
fn shutdown_coordinator_shutdown_source_force() {
293+
let mut rt = runtime::Runtime::new().unwrap();
294+
let mut shutdown = ShutdownCoordinator::new();
295+
let name = "test";
296+
297+
let (_shutdown_signal, force_shutdown_tripwire) = shutdown.register_source(name);
298+
299+
shutdown.shutdown_source_begin(name);
300+
301+
// Since we never drop the ShutdownSignal the ShutdownCoordinator assumes the Source is
302+
// still running and must force shutdown.
303+
let deadline = Instant::now() + Duration::from_secs(1);
304+
assert_eq!(
305+
false,
306+
shutdown.shutdown_source_end(&mut rt, name.to_string(), deadline)
307+
);
308+
309+
assert!(force_shutdown_tripwire.wait().is_ok());
310+
}
311+
}

src/sources/docker.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::{
22
event::merge_state::LogEventMergeState,
33
event::{self, Event, Value},
4+
shutdown::ShutdownSignal,
45
topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription},
56
};
67
use bytes::{Bytes, BytesMut};
@@ -104,6 +105,7 @@ impl SourceConfig for DockerConfig {
104105
&self,
105106
_name: &str,
106107
_globals: &GlobalOptions,
108+
_shutdown: ShutdownSignal,
107109
out: Sender<Event>,
108110
) -> crate::Result<super::Source> {
109111
DockerSource::new(
@@ -994,7 +996,12 @@ mod tests {
994996
let (sender, recv) = mpsc::channel(100);
995997
rt.spawn(
996998
config
997-
.build("default", &GlobalOptions::default(), sender)
999+
.build(
1000+
"default",
1001+
&GlobalOptions::default(),
1002+
ShutdownSignal::noop(),
1003+
sender,
1004+
)
9981005
.unwrap(),
9991006
);
10001007
recv

src/sources/file/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::{
22
event::{self, Event},
3+
shutdown::ShutdownSignal,
34
topology::config::{DataType, GlobalOptions, SourceConfig, SourceDescription},
45
trace::{current_span, Instrument},
56
};
@@ -186,6 +187,7 @@ impl SourceConfig for FileConfig {
186187
&self,
187188
name: &str,
188189
globals: &GlobalOptions,
190+
_shutdown: ShutdownSignal,
189191
out: mpsc::Sender<Event>,
190192
) -> crate::Result<super::Source> {
191193
// add the source name as a subdir, so that multiple sources can

0 commit comments

Comments
 (0)