dvreed77
dvreed77

Reputation: 2387

Rust waiting for Tokio threads to finish

I'm trying to create a little program that will kick off new threads whenever an MPSC channel receives a message. I'm able to get the program to kick off the threads, but can't figure out how to get to the program to stay open.

Here is the app:

#[tokio::main]
async fn main() {
    let (sender, mut receiver) = mpsc::channel(100);

    let handles = Arc::new(tokio::sync::Mutex::new(vec![]));
    let handles_clone = handles.clone();
    let consumer_task = tokio::spawn(async move {
        while let Some(item) = receiver.recv().await {
            println!("Consumer received: {}", item);
            let h = tokio::spawn(async move {
                println!("Thread {} working", item);
                sleep(Duration::from_secs(2)).await;
                println!("Thread {} done", item);
            });
            {
                let mut handles = handles_clone.lock().await;
                handles.push(h);
            }
        }
    });

    let producer_task = tokio::spawn(async move {
        for i in 0..5 {
            let _ = sender.send(i).await;
        }
    });

    let mut handles = handles.lock().await;
    handles.push(consumer_task);
    handles.push(producer_task);

    let a = handles.deref();

    join_all(a);

My thought is that I need to join_all on the handles that I'm kicking off, but am having trouble figuring out how I can push handles onto the vector inside the receiver thread, AND then join_all at the end of the program to block until those threads are done.

I wrapping the handles vec in an Arc<Mutex<_>> since I want to reference the handles vec inside the receiver thread, but I can't seem to join_all at the end, and get this error:

`&tokio::task::JoinHandle<()>` is not a future
the trait `futures::Future` is not implemented for `&tokio::task::JoinHandle<()>`
&tokio::task::JoinHandle<()> must be a future or must implement `IntoFuture` to be awaited
the trait `futures::Future` is implemented for `tokio::task::JoinHandle<T>`
`futures::Future` is implemented for `&mut tokio::task::JoinHandle<()>`, but not for `&tokio::task::JoinHandle<()>`

Any help would be greatly appreciated.

Upvotes: 1

Views: 1786

Answers (1)

Caesar
Caesar

Reputation: 8514

Do you see that & in the error message? The compiler error comes from the fact you don't own the JoinHandles when you call join_all (JoinHandles can only be joined once, and thus they need to be owned to do that.) You can fix that error by replacing the last two lines with:

join_all(handles.drain(..)).await;

but then you race into a deadlock: The consumer task will try to lock handles_clone, but that is locked by the "main" task, waiting in join_all, which will never finish. You could circumvent this problem by locking, popping handles, and awaiting one by one, but you'd essentially have implemented an inefficient (lifo) channel by doing so. It would be better to actually use a channel, then.

Better yet, though, to make your life easier by moving the handles Vec into your consumer task and joining there. That makes the mutex unnecessary.

let (sender, mut receiver) = mpsc::channel(100);

let consumer_task = tokio::spawn(async move {
    let mut handles = vec![];
    while let Some(item) = receiver.recv().await {
        println!("Consumer received: {}", item);
        let h = tokio::spawn(async move {
            println!("Thread {} working", item);
            sleep(Duration::from_secs(2)).await;
            println!("Thread {} done", item);
        });

        handles.push(h);
    }
    join_all(handles).await;
});

let producer_task = tokio::spawn(async move {
    for i in 0..5 {
        let _ = sender.send(i).await;
    }
});

consumer_task.await.unwrap();
producer_task.await.unwrap();

Upvotes: 2

Related Questions