How to terminate a blocking tokio task?

In my application I have a blocking task that synchronically reads messages from a queue and feeds them to a running task. All of this works fine, but the problem that I'm having is that the process does not terminate correctly, since the queue_reader task does not stop.

I've constructed a small example based on the tokio documentation at: https://docs.rs/tokio/1.20.1/tokio/task/fn.spawn_blocking.html

use tokio::sync::mpsc;
use tokio::task;

#[tokio::main]
async fn main() {
    let (incoming_tx, mut incoming_rx) = mpsc::channel(2);
    // Some blocking task that never ends
    let queue_reader = task::spawn_blocking(move || {
        loop {
            // Stand in for receiving messages from queue
            incoming_tx.blocking_send(5).unwrap();
        }
    });

    let mut acc = 0;
    // Some complex condition that determines whether the job is done
    while acc < 95 {
        tokio::select! {
            Some(v) = incoming_rx.recv() => {
                acc += v;
            }
        }
    }
    assert_eq!(acc, 95);
    println!("Finalizing thread");
    queue_reader.abort(); // This doesn't seem to terminate the queue_reader task
    queue_reader.await.unwrap(); // <-- The process hangs on this task.
    println!("Done");
}

At first I expected that queue_reader.abort() should terminate the task, however it doesn't. My expectation is that tokio can only do this for tasks that use .await internally, because that will handle control over to tokio. Is this right?

In order to terminate the queue_reader task I introduced a oneshot channel, over which I signal the termination, as shown in the next snippet.

use tokio::task;
use tokio::sync::{oneshot, mpsc};

#[tokio::main]
async fn main() {
    let (incoming_tx, mut incoming_rx) = mpsc::channel(2);
    // A new channel to communicate when the process must finish.
    let (term_tx, mut term_rx) = oneshot::channel();
    // Some blocking task that never ends
    let queue_reader = task::spawn_blocking(move || {
        // As long as termination is not signalled
        while term_rx.try_recv().is_err() {
            // Stand in for receiving messages from queue
            incoming_tx.blocking_send(5).unwrap();
        }
    });

    let mut acc = 0;
    // Some complex condition that determines whether the job is done
    while acc < 95 {
        tokio::select! {
            Some(v) = incoming_rx.recv() => {
                acc += v;
            }
        }
    }
    assert_eq!(acc, 95);
    // Signal termination
    term_tx.send(()).unwrap();
    println!("Finalizing thread");
    queue_reader.await.unwrap();
    println!("Done");
}

My question is, is this the canonical/best way to do this, or are there better alternatives?

Upvotes: 3

Views: 3209

Answers (1)

Peter Hall
Peter Hall

Reputation: 58765

Tokio cannot terminate CPU-bound/blocking tasks.

It is technically possible to kill OS threads, but generally it is not a good idea, as it's expensive to create new threads and it can leave your program in an invalid state. Even if Tokio decided this was something worth implementing, it would serverely limit its implementation - it would be forced into a multithread model, just to support the possibility that you'd want to kill a blocking task before it's finished.

Your solution is pretty good; give your blocking task the responsibility for terminating itself and provide a way to tell it to do so. If this future was part of a library, you could abstract the mechanism away by returning a "handle" to the task that had a cancel() method.

Are there better alternatives? Maybe, but that would depend on other factors. Your solution is good and easily extended, for example if you later needed to send different types of signal to the task.

Upvotes: 2

Related Questions