Alex Ozdemir
Alex Ozdemir

Reputation: 606

Can a Tokio task terminate the whole runtime gracefully?

I start up a Tokio runtime with code like this:

tokio::run(my_future);

My future goes on to start a bunch of tasks in response to various conditions.

One of those tasks is responsible for determining when the program should shut down. However, I don't know how to have that task gracefully terminate the program. Ideally, I'd like to find a way for this task to cause the run function call to terminate.

Below is an example of the kind of program I would like to write:

extern crate tokio;

use tokio::prelude::*;

use std::time::Duration;
use std::time::Instant;

use tokio::timer::{Delay, Interval};

fn main() {
    let kill_future = Delay::new(Instant::now() + Duration::from_secs(3));

    let time_print_future = Interval::new_interval(Duration::from_secs(1));

    let mut runtime = tokio::runtime::Runtime::new().expect("failed to start new Runtime");
    runtime.spawn(time_print_future.for_each(|t| Ok(println!("{:?}", t))).map_err(|_| ()));
    runtime.spawn(
        kill_future
            .map_err(|_| {
                eprintln!("Timer error");
            })
            .map(move |()| {
                // TODO
                unimplemented!("Shutdown the runtime!");
            }),
    );
    // TODO
    unimplemented!("Block until the runtime is shutdown");
    println!("Done");
}

shutdown_now seems promising, but upon further investigation, it's probably not going to work. In particular, it takes ownership of the runtime, and Tokio is probably not going to allow both the main thread (where the runtime was created) and some random task to own the runtime.

Upvotes: 2

Views: 4215

Answers (1)

Shepmaster
Shepmaster

Reputation: 431379

You can use a oneshot channel to communicate from inside the runtime to outside. When the delay expires, we send a single message through the channel.

Outside of the runtime, once we receive that message we initiate a shutdown of the runtime and wait for it to finish.

use std::time::{Duration, Instant};
use tokio::{
    prelude::*,
    runtime::Runtime,
    sync::oneshot,
    timer::{Delay, Interval},
}; // 0.1.15

fn main() {
    let mut runtime = Runtime::new().expect("failed to start new Runtime");

    let (tx, rx) = oneshot::channel();

    runtime.spawn({
        let every_second = Interval::new_interval(Duration::from_secs(1));
        every_second
            .for_each(|t| Ok(println!("{:?}", t)))
            .map_err(drop)
    });

    runtime.spawn({
        let in_three_seconds = Delay::new(Instant::now() + Duration::from_secs(3));
        in_three_seconds
            .map_err(|_| eprintln!("Timer error"))
            .and_then(move |_| tx.send(()))
    });

    rx.wait().expect("unable to wait for receiver");
    runtime
        .shutdown_now()
        .wait()
        .expect("unable to wait for shutdown");

    println!("Done");
}

See also:

Upvotes: 3

Related Questions