Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spec out the "Simplify Shutdown" project #1894

Closed
binarylogic opened this issue Feb 22, 2020 · 5 comments
Closed

Spec out the "Simplify Shutdown" project #1894

binarylogic opened this issue Feb 22, 2020 · 5 comments
Assignees
Labels
type: task Generic non-code related tasks

Comments

@binarylogic
Copy link
Contributor

As explained in "Plan of attack" section in #1550 (comment), we need to:

  1. Explore options for simplifying shutdown
  2. Write up proposal with spike-level code
  3. Discuss and come to a consensus on how to proceed

This issue should represent those 3 tasks.

@binarylogic binarylogic added the type: task Generic non-code related tasks label Feb 22, 2020
@stbrody
Copy link
Contributor

stbrody commented Mar 4, 2020

Design proposal for cleaning up Source shutdown

The core idea is to change shutdown to be two-phase with regards to shutting down all running Sources. The first phase will be to signal to all Sources to begin shutting down gracefully, and then have the Sources signal back to the shutdown coordinator once their shutdown is complete. The second phase will involve the shutdown coordinator waiting for all Sources to report that their shutdown is complete, or force the Source to shutdown after a given timeout.

Code changes to Sources

We will introduce a new type ShutdownSignal, which will be passed to every new Source via its build() method. ShutdownSignal will be a Future that when it resolves is a signal to the Source to begin shutdown. When a ShutdownSignal Future resolves it will return a ShutdownCompleteHandle which when dropped signals to the shutdown coordinator that this Source’s shutdown is complete. It will be the responsibility of each Source to confirm that a copy of the ShutdownCompleteHandle remains alive until the Source is done with all its work. ShutdownCompleteHandle will be Cloneable and internally store a refcount so that the shutdown coordinator is only signaled after the last copy of the ShutdownCompleteHandle is dropped. Internally ShutdownCompleteHandle will be a thin wrapper around an Arc<stream_cancel::Trigger> (which is itself a thin wrapper around oneshot::Sender).

In order to make it easier for Source implementations to tie the lifetime of the ShutdownCompleteHandle to its work, we will introduce new Stream combinators as needed. Two that I already know we’ll need are a WithHandle combinator (generated via a with_handle(handle) method on the Stream) that simply stores the handle for the lifetime of the stream. Another is a TakeUntil combinator (generated via a take_until(Future) method on the Stream) that allows the underlying stream to run until the provided Future is triggered. This is basically the same as the stream_cancel::TakeUntil combinator provided by the stream_cancel::StreamExt trait, the only difference is that our TakeUntil combinator will be able to store the result of the provided Future internally. Ideally the changes we need to TakeUntil could actually be upstreamed into the main stream_cancel library.

Code changes to Topology

The RunningTopology struct currently stores a shutdown_triggers object which is a HashMap<String, Trigger>. This will be replaced with a single ShutdownCoordinator struct which will contain all the necessary Triggers and Tripwires for coordinating the shutdown process, as well as methods that the RunningTopology can call to drive the shutdown process. Internally the ShutdownCoordinator will contain 3 hashmaps:

shutdown_begin_triggers: HashMap<String, Trigger>,
shutdown_complete_tripwires: HashMap<String, Tripwire>,
shutdown_force_triggers: HashMap<String, Trigger>,

The shutdown_begin_triggers will be the Triggers for each Source that signals the corresponding ShutdownSignal Future for that Source to resolve.
The shutdown_complete_tripwires will be what is signaled when the corresponding ShutdownCompleteHandle is dropped (and its refcount is zero).
The shutdown_force_triggers will be the analogue of what is currently called shutdown_triggers and is what will cause the corresponding Source Task to be dropped even if it hasn’t finished shutting down yet. This will be achieved as it is today, by selecting on the Source Task and the Tripwire at the other end of these Triggers.

The ShutdownCoordinator coordinator will provide a pair of methods to begin and end shutdown of all Sources and another pair to begin and end shutdown of a provided subset of Sources (by name). The begin phase will signal the relevant Triggers from shutdown_begin_triggers. The end phase will wait for the relevant Tripwires from shutdown_complete_tripwires to be notified, and if they aren’t after some number of seconds from when the shutdown_begin_triggers was signaled, then it will go and notify the corresponding Triggers from shutdown_force_triggers.

Open questions

  • How long should the ShutdownCoordinator wait before forcing shutdown of the Sources? Is this worth adding a command-line flag for?
  • Do we think we’re going to want to do something similar to this for Sinks as well at some point? If so, and we do want a new command line flag for the shutdown timeout, should it be named generally enough to be reused as the timeout for Sink shutdown as well, or should we name it something specific to sources?

@stbrody
Copy link
Contributor

stbrody commented Mar 4, 2020

This proposal is largely based on the the POC I built with @LucioFranco's input. @lukesteensen, what do you think?

@LucioFranco
Copy link
Contributor

Overall this looks great! One question I had is why is the ShutdownCompleteHandle clonable but ShutdownSignal is not? To me it makes much more sense to have the future be clonable and passed around than the resolved item?

@stbrody
Copy link
Contributor

stbrody commented Mar 5, 2020

Overall this looks great! One question I had is why is the ShutdownCompleteHandle clonable but ShutdownSignal is not? To me it makes much more sense to have the future be clonable and passed around than the resolved item?

Yep, totally agree. I originally designed it that way because back in the oh-so-long-ago time of 2 days ago when I didn't understand how Future combinators worked I thought ShutdownSignal would need to provide direct access to its underlying ShutdownCompleteHandle for the custom shutdown logic in the TCP source to work. Now that I understand future combinators better I think everything we need can be done by combining the ShutdownSignal Future with the other work going on in clever ways. I also think we no longer need the WithHandle combinator I proposed in the design above (though we'll still need to either make our own copy of take_until or commit a change upstream to stream_cancel). I've already made the change to the tcp source in my working copy and it seems pretty straightforward and the abstraction layers look much nicer this way.

@binarylogic
Copy link
Contributor Author

This seems to be done. Nice work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: task Generic non-code related tasks
Projects
None yet
Development

No branches or pull requests

3 participants