norbjd
norbjd

Reputation: 11237

Rust : futures::future::join_all(...).await runs futures sequentially but parallelism wanted

I have the following async function (implementation is not important) :

async fn long_task(task_number: i32) {
    // do some long work
    println!("Task {} executed", task_number);
}

I want to run n times this function concurrently, so I defined this function :

async fn execute_long_tasks_async(n: i32) {
    let mut futures = Vec::new();
    for i in 1..=n {
        futures.push(long_task(i));
    }
    futures::future::join_all(futures).await;
}

I'm using the join_all function to wait until all tasks are executed. Then I call this function in my main :

fn main() {
    futures::executor::block_on(execute_long_tasks_async(3));
}

My issue is that the tasks are run sequentially :

Executing task 1
Task 1 executed
Executing task 2
Task 2 executed
Executing task 3
Task 3 executed

But I would have expect it runs concurrently, and I would get something like :

Executing task 1
Executing task 3
Executing task 2
Task 1 executed
Task 3 executed
Task 2 executed

Is there an alternative to futures::future::join_all to run all tasks in parallel ?

I'd like to use await to create a simple example demonstrating async and await.

Upvotes: 8

Views: 6145

Answers (1)

Matthias247
Matthias247

Reputation: 10396

join_all runs tasks concurrently (not parallel). It has the limitation that it can only switch between tasks when they yield. In addition that this it will always prefer to work on the first task if that one is not ready.

If your function is e.g. defined as

async fn long_task(task_number: i32) {
    println!("Executing Task {}", task_number);
    tokio::time::delay_for(Duration::from_secs(1)).await;
    println!("Task {} executed", task_number);
}

then the await/suspend point in the middle of the function will provide join_all with an opportunity to run the other functions/Futures - and you would observe the expected output.

However if long_task does not yield (e.g. because it would be thread-blocking), then this function would run to completion before other tasks start:

async fn long_task(task_number: i32) {
    println!("Executing Task {}", task_number);
    std::thread::sleep(Duration::from_secs(1));
    println!("Task {} executed", task_number);
}

If you have such functions they are in general not a good fit for the async world. async functions are intended not to be CPU intense and should not block the thread - so that other functions scheduled on the same executor can still run.

One alternative that might provide you a bit more parallelism is to use a multithreaded async runtime (e.g. tokio) and spawn each async function as a separate task. In that case the tasks could run on separate CPU cores and threads and wouldn't block each other that much. You can then use join_all on the collection of returned JoinHandles to wait for all tasks to complete.

Upvotes: 16

Related Questions