Rtxx
Rtxx

Reputation: 73

How can I execute an action after each end of thread?

In Rust, I would like to do multiple tasks in parallel and when each task finishes, I would like to do another task handled by the main process. I know that tasks will finish at different timings, and I don't want to wait for all the tasks to do the next task. I've tried doing multiple threads handled by the main process but I have to wait for all the threads to finish before doing another action or maybe I did not understand.

    for handle in handles {
        handle.join().unwrap();
    }

How can I manage to do a task handled by the main process after each end of threads without blocking the whole main thread?

Here is a diagram to explain what I want to do :

Diagram

If i'm not clear or if you have a better idea to handle my problem, don't mind to tell me!

Upvotes: 0

Views: 475

Answers (1)

Sven Marnach
Sven Marnach

Reputation: 601451

Here's an example how to implement this using FuturesUnordered and Tokio:

use futures::{stream::FuturesUnordered, StreamExt};
use tokio::time::sleep;
use std::{time::Duration, future::ready};

#[tokio::main]
async fn main() {
    let tasks = FuturesUnordered::new();
    tasks.push(some_task(1000));
    tasks.push(some_task(2000));
    tasks.push(some_task(500));
    tasks.push(some_task(1500));
    tasks.for_each(|result| {
        println!("Task finished after {} ms.", result);
        ready(())
    }).await;
}

async fn some_task(delay_ms: u64) -> u64 {
    sleep(Duration::from_millis(delay_ms)).await;
    delay_ms
}

If you run this code, you can see that the closure passed to for_each() is executed immediately whenever a task finishes, even though they don't finish in the order they were created.

Note that Tokio takes care of scheduling the tasks to different threads for you. By default, there will be one thread per CPU core.

To compile this, you need to add this to your Cargo.toml file:

[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }

If you want to add some proper error propagation, the code becomes only slightly more complex – most of the added code is for the custom error type:

use futures::{stream::FuturesUnordered, TryStreamExt};
use tokio::time::sleep;
use std::{time::Duration, future::ready};

#[tokio::main]
async fn main() -> Result<(), MyError> {
    let tasks = FuturesUnordered::new();
    tasks.push(some_task(1000));
    tasks.push(some_task(2000));
    tasks.push(some_task(500));
    tasks.push(some_task(1500));
    tasks.try_for_each(|result| {
        println!("Task finished after {} ms.", result);
        ready(Ok(()))
    }).await
}

async fn some_task(delay_ms: u64) -> Result<u64, MyError> {
    sleep(Duration::from_millis(delay_ms)).await;
    Ok(delay_ms)
}

#[derive(Debug)]
struct MyError {}

impl std::fmt::Display for MyError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "MyError occurred")
    }
}

impl std::error::Error for MyError {}

Upvotes: 2

Related Questions