Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a proper topology stop strategy #1091

Closed
LucioFranco opened this issue Oct 25, 2019 · 4 comments · Fixed by #1994 or #2098
Closed

Implement a proper topology stop strategy #1091

LucioFranco opened this issue Oct 25, 2019 · 4 comments · Fixed by #1994 or #2098
Assignees
Labels
domain: topology Anything related to Vector's topology code type: bug A code related bug. type: enhancement A value-adding code change that enhances its existing functionality. type: tech debt A code change that does not add user value.

Comments

@LucioFranco
Copy link
Contributor

LucioFranco commented Oct 25, 2019

Motivation

Currently, our topology stop method attempts to shut down the topology tasks by selecting against them and dropping any resources. This is incorrect, we should instead notify the task that we need to shut down and let it gracefully shutdown.

Prior art:

https://docs.rs/tokio-evacuate/
https://github.com/linkerd/linkerd2-proxy/blob/master/linkerd/app/core/src/serve.rs#L13
https://github.com/sfackler/futures-shutdown

Proposal

Our codebase currently does this https://github.com/timberio/vector/blob/master/src/topology/builder.rs#L128 which will force cancel a task. In the context of TCP this is fine because we create a second tripwire that when the original task gets drop will fire and thus cancel all of its child tasks.

This though does not work for something like hyper that wants a signal to shutdown as shown here https://docs.rs/hyper/0.13.0-alpha.4/hyper/server/struct.Server.html#method.with_graceful_shutdown. What this means is that we were never originally "gracefully" shutting down but forcing the task to quit.

My proposal is to invert this by providing each spawned task a handle that can produce a future that will be notified once the shutdown process has started. Each handle is correlated with one task. That task should be considered part of the group of tasks connected to that one instance of the topology. When you reload the topology we should shutdown all tasks that were spawned from that config. The shutdown should first signal each task so they can start their clean up. Once they have all finished their clean up the handle gets dropped. Once, all handles are dropped the main shutdown future can complete. This main future that is called from Topology::stop can be selected against with a timer to produce a graceful shutdown period then a force shutdown if tasks hang.

Here is an example I threw together last night that shows how we might be able to expose this concept to users that want to implement sources, transforms, and sinks. https://github.com/LucioFranco/iquit/blob/master/src/work.rs#L24

One big reason, I think we should provide a better api as well as properly implementing this is because getting shutdown right is hard. Because its hard, it will be difficult to expect contributors to properly implement it manually like how the TCP source handles it. With a nice api to use and good documentation, we can ensure that all our sources, transforms, and sinks properly shutdown in the future.

Ideally, we could provide something like this:

let shutdown = Shutdown::new();

// This will use _some_ executor to spawn a task.
//
// This internally will register a new task, get a handle to that
// registration. Then call this closure with the handle so that the
// inner task future can do something with it. This will also wrap
// the returned Future from `SourceConfig::build` with a select against a
// oneshot channel like we do now so that we can force shut it down
// if it does not gracefully shutdown.
shutdown.spawn(|handle| source.build(name, globals, out, handle));

// Now lets attempt to shut it down
if let Err(_) =
	rt.block_on(shutdown.shutdown_with_timeout(Duration::from_secs(60))) {
	// Graceful shutdown failed so lets force shutdown everything
	rt.block_on(shutdown.force());
}

cc @lukesteensen @a-rodin

@LucioFranco LucioFranco added type: bug A code related bug. type: enhancement A value-adding code change that enhances its existing functionality. domain: topology Anything related to Vector's topology code type: tech debt A code change that does not add user value. needs: approval Needs review & approval before work can begin. labels Oct 25, 2019
@binarylogic
Copy link
Contributor

Nice write up! So it sounds like we should couple this with the upcoming contributing project, correct?

@lukesteensen
Copy link
Member

👍 Your example looks good to me. Definitely want to keep the API as simple and hard to misuse as possible.

@stbrody
Copy link
Contributor

stbrody commented Feb 22, 2020

I'm trying to understand how shutdown in the TCP source works currently. I see here we create a Trigger, Tripwire pair, and we later pass the Tripwire to the FramedRead's take_until method, but we seem to drop the Trigger on the floor without ever signaling it. My understanding of how the Trigger, Tripwire pairs are supposed to work is that the Tripwire Future is activated when the Trigger is signaled. Since the Trigger here is never signaled, doesn't that mean the Tripwire should never fire and take_until would run forever?

@LucioFranco @lukesteensen

@stbrody
Copy link
Contributor

stbrody commented Feb 24, 2020

Ugh, nevermind. I had assumed that Trigger.cancel() did what Trigger.disable() actually does. I now see that this line is what signals the trigger to fire. I assume the future from the for_each line resolves and the inspect line activates when the socket is closed or errors.

stbrody pushed a commit to stbrody/vector that referenced this issue Mar 6, 2020
)

Phase one is signaling to all sources to shut down, phase 2 is waiting for them
finish shutting down gracefully, or forcing shutdown if they don't finish within
a time limit (currently 30 seconds)

Signed-off-by: Spencer T Brody <spencer.t.brody@gmail.com>
stbrody pushed a commit to stbrody/vector that referenced this issue Mar 6, 2020
)

Phase one is signaling to all sources to shut down, phase 2 is waiting for them
finish shutting down gracefully, or forcing shutdown if they don't finish within
a time limit (currently 30 seconds)

Signed-off-by: Spencer T Brody <spencer.t.brody@gmail.com>
stbrody pushed a commit to stbrody/vector that referenced this issue Mar 9, 2020
…ordotdev#1091)

Phase one is signaling to all sources to shut down, phase 2 is waiting for them
finish shutting down gracefully, or forcing shutdown if they don't finish within
a time limit (currently 30 seconds)

Signed-off-by: Spencer T Brody <spencer.t.brody@gmail.com>
stbrody added a commit that referenced this issue Mar 18, 2020
* chore(topology): Refactor source shutdown and make it two-phase (#1091)

Phase one is signaling to all sources to shut down, phase 2 is waiting for them
finish shutting down gracefully, or forcing shutdown if they don't finish within
a time limit (currently 3 seconds)

Signed-off-by: Spencer T Brody <spencer.t.brody@gmail.com>
@Hoverbear Hoverbear reopened this Mar 18, 2020
@stbrody stbrody reopened this Mar 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: topology Anything related to Vector's topology code type: bug A code related bug. type: enhancement A value-adding code change that enhances its existing functionality. type: tech debt A code change that does not add user value.
Projects
None yet
5 participants