Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: structured concurrency #2596

Closed
carllerche opened this issue Jun 7, 2020 · 15 comments
Closed

RFC: structured concurrency #2596

carllerche opened this issue Jun 7, 2020 · 15 comments
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. C-proposal Category: a proposal and request for comments

Comments

@carllerche
Copy link
Member

carllerche commented Jun 7, 2020

This proposal is mostly the result of @Matthias247's work (#1879, #2579, #2576).
I took this work, summarized the API and added a few tweaks as proposals.


Summary

A specific proposal for structured concurrency. The background info and
motivation have already been discussed in #1879, so this will be skipped here.

There are also two PRs with specific proposals: #2579, #2576.

This RFC build upon this work.

Also related: #2576

The RFC proposes breaking changes for 0.3. "Initial steps" proposes a way to add
the functionality without breaking changes in 0.2.

Requirements

At a high level, structured concurrency requires that:

  • No tasks are leaked from a scope.
  • No error (Result and panic) goes unhandled.

Again, see #2579 for more details.

Proposed Changes

  • All tasks must be spawned within a scope.
  • Each task has an implicit scope.
  • Explicit scopes may be created with task::scope.
  • task::signalled().await waits until the task is signalled, indicating it
    should gracefully terminate.
  • JoinHandle forcefully cancels the task on drop.
  • JoinHandle gains the following methods:
    • background(self): run the task in the background until the scope drops.
    • try_background(self): run the task in the background until the scope
      drops. If the task completes with Err, forcefully cancel the owning scope.
    • signal(&self) signals to the task to gracefully terminate.

Terminology

forceful cancellation: The runtime drops the spawned asap without providing
an ability to gracefully complete. All cleanup is done synchronously in drop
implementations.

graceful cancellation: Signal to a task it should stop processing and clean
up any resources itself.

Details

Scopes

All spawned tasks must be spawned within a scope. The task is bound to the
scope.

task::scope(async {

  let join_handle = tokio::spawn(async {
    // do work
  });

}).await;

This creates a new scope and executes the provided block within the context of
this scope. All calls to tokio::spawn link the task to the current scope. When
the block provided to scope completes, any spawned tasks that have not yet
completed are forcefully cancelled. Once all tasks are cancelled, the call to
task::scope(...).await completes.

Scopes do not attempt to handle graceful cancellation. Graceful cancellation
is provided by a separate set of utilities described below.

All tasks come with an implicit scope. In other words, spawning a task is
equivalent to:

tokio::spawn(async {
  task::scope(async {
    // task body
  }).await
})

There could also be a global scope that is used as a catch-all for tasks that
need to run in the background without being tied to a specific scope. THis
global scope would be the "runtime global" scope.

Error propagation

As @Matthias247 pointed out in #1879, when using JoinHandle values, error
propagation is naturally handled using Rust's ? operator:

task::scope(async {

  let t1 = tokio::spawn(async { ... });
  let t2 = tokio::spawn(async { ... });

  t1.await?;
  t2.await?;

}).await?;

In this case, if t1 completes with Err, t1.await? will return from the
async { ... } block passed to task::scope. Once this happens, all
outstanding tasks are forcibly cancelled, resulting in no tasks leaking.

Dropping JoinHandle forcefully cancel the task

Sub task errors must be handled. This requires avoiding to drop them without
processing them. To do this, the return value of the task must be processed
somehow. This is done by using the JoinHandle returned from
tokio::spawn. In order to ensure the return value is handled, dropping
JoinHandle results in the associated task being forcefully canceled.
JoinHandle is also annotated with #[must_use].

However, there are cases in which the caller does not need the return value. For
example, take the TcpListener accept loop pattern:

while let Some(stream) = listener.accept().await? {
    tokio::spawn(async move {
        process(stream).await
    });
}

In this case, the caller does not need to track each JoinHandle. To handle
these cases, JoinHandle gains two new functions (naming TBD):

impl JoinHandle<()> {
    fn background(self) { ... }
}

impl<E> JoinHandle<Result<(), E>> {
    fn try_background(self) { ... }
}

In the first case, the type signature indicates the task can only fail due to a
panic. In the second, the task may fail due to Err being returned by the task.
When a task that is moved to the scope "background" fails due to a panic or
Err return, the scope is forcibly canceled, resulting in any outstanding task
associated with the scope to be forcibly canceled.

Now, the TcpListener accept loop becomes:

while let Some(stream) = listener.accept().await? {
    tokio::spawn(async move {
        process(stream).await
    }).try_background();
}

Dropping a JoinHandle does not mean the task is terminated

An important detail to note, dropping JoinHandle will result in task
cancellation ASAP, but asynchronously. This means that there is no guarantee
that, when JoinHandle is dropped, the associated task has terminated. Instead,
the guarantee is that the task will be terminated when the scope that spawned
the task
.

For example, the following is not guaranteed to work:

struct UnsetOnDrop(Arc<AtomicBool>);

impl Drop for UnsetOnDrop {
    fn drop(&mut self) {
        self.0.compare_and_swap(true, false, SeqCst);
    }
}

let flag = Arc::new(AtomicBool::new(false));

loop {
    assert!(!flag.swap(true, SeqCst));

    let drop_guard = UnsetOnDrop(flag.clone());

    let jh = tokio::spawn(async move {
        some_work().await;

        drop(drop_guard);
    });

    // Does not wait for the spawned task to be fully dropped
    drop(jh);
}

Instead, to make this work, the join handle would need to be awaited:

let flag = Arc::new(AtomicBool::new(false));

loop {
    assert!(!flag.swap(true, SeqCst));

    let drop_guard = UnsetOnDrop(flag.clone());

    let jh = tokio::spawn(async move {
        some_work().await;

        drop(drop_guard);
    });

    // Force cancel the task
    jh.cancel();

    // Wait for completion
    jh.await;
}

This makes things a bit tricky when paired with async constructs that leverage
drop for cancellation. For example, this snippet would not work:

let flag = Arc::new(AtomicBool::new(false));

loop {
    // Will probably panic after an iteration
    assert!(!flag.swap(true, SeqCst));

    tokio::select! {
        _ => {
            let drop_guard = UnsetOnDrop(flag.clone());

            tokio::spawn(async move {
                some_work().await;

                drop(drop_guard);
            }).await;
        } => {}
        // Always ready branch
        _ => future::ready(()) => {}
    }
}

Again, this is because the only guarantee is that sub-tasks are terminated when
the containing scope (task that spawned) terminates. This code snippet requires
the ability to guarantee the task is completed mid scope. The only way to do
this is .await on the JoinHandle.

Nesting scopes

Usually, structured concurrency descripes a "tree" of tasks. Scopes have N
associated tasks. Each one of those tasks may have M sub tasks, etc. When an
error happens at any level, all decendent tasks are canceled and all ancestors
are canceled until the error is handled.

Instead of explicitly linking scopes, Rust's ownership system is used. A
scope is nested by virtue of the return value of task::scope (Scope) being held in a
task owned by a parent scope. If a Scope is dropped it forcefully cancels all
associated sub tasks. This, in turn, results in the sub tasks being dropped and
any Scope values held by the sub task to be dropped.

However, Scope::drop must be synchronous yet cancelling a task and waiting
for all sub tasks to drop is asynchronous. This is handled has follows:

Every Scope has a "wait set" which is the set of tasks that the scope needs to
wait on when it is .awaited

When Scope is dropped:

  • All sub tasks are forcefully cancelled (they are not dropped yet).
  • The current scope context is inspected to find the containing scope.
  • The dropped scope's wait set is added to the containing scope's wait set.

By doing this, containing_scope.await does not complete until all descendent
tasks are completely terminated.

If a scope block terminates early due to an unhandled sub task error,
task::scope(async { ... }).await completes with an error. In this case, the
task either handles the error or completes the task with an error. In the latter
case, the task's containing scope will receive the error and the process
repeats up the task hierarchy until the error is handled.

spawn_blocking

In order to maintain structured concurrency, spawn_blocking would need to
behave the same as spawn. The parent spawner would not be able to complete
until all spawn_blocking tasks complete. Unfortunately, as it is not possible
to forcibly cancel blocking tasks, this could result in the parent task taking
an indefinite amount of time before completing.

This limitation will have to be documented with spawn_blocking. Ideally, we
could come up with mechanimsms to limit this impact.

Graceful task cancellation

Graceful task cancellation requires "signalling" a task, then allowing the task
to terminate on its own time. To do this, JoinHandle gains a new function,
signal. This sends a signal to the task. While this signal could be used for
any purpose, it is implied to indicate graceful cancellation.

From the spawned task, the signal is received using: task::signalled().await.
Putting it all together:

let join_handle = tokio::spawn(async {
    tokio::select! {
      _ => do_work() => {}
      _ => task::signaled() => {
        // Do cleanup work here
      }
    }
});

join_handle.signal();
join_handle.await;

Initial steps

The majority of this work can be done without any breaking changes. As an
initial step, all behavior in Tokio 0.2 would remain as is. Tasks would not
get an implicit scope. Instead, task::scope(...) must always be called.
Secondly, there would be tokio::spawn_scoped(...) which would spawn tasks
within the current scope. In 0.3, spawn_scoped would become tokio::spawn.

@carllerche carllerche added C-enhancement Category: A PR with an enhancement or bugfix. C-proposal Category: a proposal and request for comments A-tokio Area: The main tokio crate labels Jun 7, 2020
@njsmith
Copy link

njsmith commented Jun 24, 2020

To me, the most fundamental idea in structured concurrency is that functions should work as abstractions: you should be able to tell by looking at a call site whether a function can spawn work that outlives the call, without inspecting the body of the function.

Having implicit scopes breaks this property.

This design still gives stronger guarantees than the traditional approach where you spawn tasks with unbounded lifetimes all over the place. But it's still much weaker than if you removed implicit scopes entirely. There's a lot here that reminds me of ZIO, so I'll link to this discussion of ZIO on the structured concurrency forum instead of repeating it :-) https://trio.discourse.group/t/zio-scala-library/72

@davidbarsky
Copy link
Member

@njsmith Thanks for commenting on this, Nathaniel! I read through the Zio thread (as well as your posts on structured concurrency 🙂), but I'm unsure how implicit scopes (in this proposal, at least) break the abstractions offered by structured concurrency. For instance, the implicit scope created by tokio::spawn in the example below

task::scope(async {

  let join_handle = tokio::spawn(async {
    // do work
  });

}).await;

...shouldn't be able to outlive task::scope. I might be missing something important, though!

@njsmith
Copy link

njsmith commented Jun 24, 2020

It can't outlive task::scope, but I'm thinking about stuff like:

fn looks_innocent() {
    tokio::spawn(...);
}

// then elsewhere...

async fn some_other_fn() {
    looks_innocent();
    // is looks_innocent still running? how can I know?
}

The basic unit of abstraction/encapsulation is a function call/stack frame. A task is a larger unit, composed of multiple stack frames. So if you have an implicit task-global scope, that can leak child tasks across different functions within the same parent task.

@carllerche
Copy link
Member Author

@njsmith

Thanks for the reply.

I agree with you that having task::scope be explicit would be much stronger and I wish there was a clean way to provide it. The problem, as I see it, is that it just isn't possible to provide a task::scope that works where users would expect it.

To reuse your example, I would assume that you would want to do this:

async fn looks_innocent() {
   task::scope(async {
       tokio::spawn(...);
    }).await;
}

// then elsewhere...

async fn some_other_fn() {
    looks_innocent().await
}

The problem is that async Rust is heavily geared towards using drop to cancel, but drop must happen immediately (i.e. not block and not be async).

Imagine you do the following:

tokio::select! {
    _ => looks_innocent().await => { ... }
   _ => time::delay_for(ms(1)) => { ... }
}

// No way to guarantee all tasks spawned by `looks_innocent()` completes

if the delay elapses first, the async block running looks_innocent() will be immediately dropped. There is no way to way to wait for all tasks spawned by looks_innocent() to complete.

@Matthias247 approached this problem by having task::scope panic if dropped when tasks are still pending.

The panic on drop strategy leads to a separate set of problems.

  1. Double panic -> abort

If code inside of a scope panics, the scope must be immediately torn down. Since we are panicking, the task::scope itself cannot panic again or it leads to an abort. However, if we don't abort, tasks will leak.

  1. A number of Tokio's primitives (such as select!) do not compose with task::scope. You will look at a piece of code and not know if it is safe to use a scope or select!. Introducing mutually non-compatible features is not an option IMO.

tl;dr, I really wish there was a clean way to introduce task::scope but I don't see way to do so in a way that would be compatible with Tokio's existing feature set and the greater async Rust ecosystem.

@hgomersall
Copy link
Contributor

hgomersall commented Jun 29, 2020

Can the tasks not be explicitly passed back from select! where they can be cleaned up manually?

Something like:

let tasks = tokio::select! {
    _ => looks_innocent().await => { ... }
   _ => time::delay_for(ms(1)) => { ... }
}

tasks.async_drop().await?

With each task implementing the AsyncDrop trait, which by default will panic. async_drop() would block completion of the scope until each sub task has completed (through a signal say).

Of course, I've probably missed something fundamental in my assertions, in which case I apologise.

@davidbarsky
Copy link
Member

@hgomersall Drop, as well as a often-discussed but-not-RFC'd-or-implemented AsyncDrop, need to be language items, which have special hooks into the compiler. There's no way to implement Drop-style hooks as a library. I suspect that if/when AsyncDrop lands in the language, something like what you describe might become possible.

@hgomersall
Copy link
Contributor

hgomersall commented Jun 29, 2020

@davidbarsky the point was less about some hypothetical language feature and more to posit how an explicit library call at the correct location might allow behaviour different to simply panicking.

Edit: I was pondering the issue raised about needing to be compatible with the existing feature set.

@carllerche
Copy link
Member Author

@hgomersall select! was just used as an example. The "drop a future" pattern is used extensively throughout the ecosystem.

@hgomersall
Copy link
Contributor

@carllerche sure, it wasn't meant to be a solution as such, more thinking around whether forcing the user to do things explicitly would be a valid way to complement the existing API. Can structured concurrency be added on to what exists with some slightly warty API additions...?

@carllerche
Copy link
Member Author

There currently is no obvious way forward.

As @Matthias247 pointed out, one should be able to establish scopes at any point. However, there is no way to enforce the scope without blocking the thread. The best we can do towards enforcing the scope is to panic when used "incorrectly". This is the strategy @Matthias247 has taken in his PRs. However, dropping adhoc is currently a key async rust pattern. I think this prohibits pancing when dropping a scope that isn't 100% complete. If we do this, using a scope within a select! would lead to panics.

We are at an impasse. Maybe if AsyncDrop lands in Rust then we can investigate this again. Until then, we have no way forward, so I will close this. It is definitely an unfortunate outcome.

@stevemk14ebr
Copy link

stevemk14ebr commented Oct 14, 2020

Hi, i am sorry to post here as this is probably just noise but this feature would be really useful and i hope that maybe a 'simple' case version can be developed to support a case such as this:

pub async fn insert_from_csv(&self, handle: &IndexHandle, csvs: &HashSet<PathBuf>) -> Result<()> {
    // iterator of csv files -> Vec<feature>
    let feature_iter = csvs.iter().filter_map(|file| {
        ... files -> features struct ...
        Some(features)
    });

    // index Vec<Features> to one DB file
    let index_future = async {
        for features in feature_iter.clone() { 
            let _ignore = self.insert_index_feature(&handle, &features).await;
        }
    };

    // index Vec<Features> to second, independent file
    let search_future = async {
        let mut search_index_writer = handle.search_vals.index.writer(100_000_000).unwrap();
        for features in feature_iter.clone() { 
            let _ignore = self.insert_search_feature(&handle, &mut search_index_writer, &features).await;
        }
        let _ = search_index_writer.commit();
    };

    join!(search_future, index_future);
}

What you see here is a parent function which is given a list of files which it then parses and forms an iterator which filter_maps file contents in a Vec structure. This iterator is then cloned() to two independent async function which read each vector and write these contents to databases. The problem with this is that the typical tokio spawn method requires self to be static, as well as the other arguments to the parent function. However this isn't needed at all because both futures are joined before the scope exists and so it's not possible for these tasks to reference stale values as the scope lives longer than the futures!

I admit i don't understand all the considerations of this feature that may be blocking a general solution, but a solution which fixes this simpler case would be really amazing as I cannot figure out to express something which is, as the literature calls it, 'embarrassingly parallel'.

@Darksonn
Copy link
Contributor

@stevemk14ebr Even with the changes that were proposed here, the tokio::spawn method would still need the 'static bound. It's impossible to do anything about this due to mem::forget.

@stevemk14ebr
Copy link

stevemk14ebr commented Oct 14, 2020

i was able to sort of solve this in a weird way using crossbeam::thread::scope but i have to make an async runtime per thread. So this seems like it's possible somehow in a round about way?

thread::scope(|scope| {
    let hThread1 = scope.spawn(|_| {
        let mut rt = Runtime::new().unwrap();
        let search_future = async {
            let mut search_index_writer = handle.search_vals.index.writer(100_000_000).unwrap();
            for features in feature_iter.clone() { 
                let _ignore = self.insert_search_feature(&handle, &mut search_index_writer, &features).await;
            }
            let _ = search_index_writer.commit();
        };
        rt.block_on(search_future);
    });
    let hThread2 = scope.spawn(|_| {
        let mut rt = Runtime::new().unwrap();
        let index_future = async {
            for features in feature_iter.clone() { 
                let _ignore = self.insert_index_feature(&handle, &features).await;
            }
        };
        rt.block_on(index_future);
    });
    hThread1.join();
    hThread2.join();
}).unwrap();

Note that this requires zero moves or lifetime markings, it just works as i would expect.

@Darksonn
Copy link
Contributor

That blocks the thread running the insert_from_csv function. Don't do that, it will prevent other tasks running on the runtime from executing.

@stevemk14ebr
Copy link

oh, well that's unfortunate :/ anyways thanks i hope something can be done eventually

mrkline added a commit to mrkline/backpak that referenced this issue Dec 27, 2021
Tokio - and Rust's async model in general - is pretty freaking cool, but
it isn't a perfect fit for everything. After hammering for a few days,
I'm pretty confident that it's not working out here:

- There's no way to enforce scoped async tasks without blocking the
  current thread.[1][2][3] This means that there's no async task
  equivalent to Rayon/Crossbeam-like scopes, and you're back to arcs and
  pins and all sorts of fun boilerplate if you'd like to foster
  parallelism with task::spawn().

- Traits and recursive calls need lots o' boxes, implemented by proc
  macros at best and by hand at worst.

- Since many FS syscalls block, tokio asyncifies them by wrapping each
  in a spawn_blocking(), which spawns a dedicated thread. Of course you
  can wrap chunks of synchronous file I/O in spawn_blocking() if kicking
  off a separate thread for each File::open() call doesn't sound fun,
  but that means you can't interact with async code anywhere inside...

- Add in the usual sprinkling of async/await/join/etc. throughout the
  code base - since anything that awaits needs to be a future itself,
  async code has a habit of bubbling up the stack.

None of these are dealbreakers by themselves, but they can add up to
real overheads. Not just cognitive, but performance too, especially if
you've already got a design with concurrent tasks that do a decent job
of saturating I/O and farming CPU-heavy work out to a thread pool.
< gestures around >

[1]: tokio-rs/tokio#1879
[2]: tokio-rs/tokio#2596
[3]: https://docs.rs/async-scoped/latest/async_scoped/struct.Scope.html#safety-3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. C-proposal Category: a proposal and request for comments
Projects
None yet
Development

No branches or pull requests

6 participants