Makogan
Makogan

Reputation: 9540

How do you spawn a repeating chain of tasks with Rayon's thread pool?

I am trying to parallelize something directly with rayon;s thread pool.

There are two steps to the idea. Assume first that you have 3 single tasks A, B, C.

I want to spawn tasks in the threadpool in such a way that when A terminates it calls B, which calls C when it terminates and finally C will call A, repeating the chain in a loop. I want to do this without ever despawning a thread.

The second step is to do the same but now A, B, C represent groups of tasks instead of individual ones. So say A represents 10K tasks operating on multiple containers at the same time, then B is a group of 50k tasks doing something similar, then C is a single task and then it loops.

So far all I have is the threadpool initialization and spawning a dummy task:

    
    let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
    pool.spawn(
        move || {
            println!("dummy");
        }
    );

The main thing I am struggling with because I don't see it in the documentation, is how to specify the looping behaviour. i.e. how to make the chain of tasks A -> B -> C and then how to expand that to groups of tasks.

Upvotes: 0

Views: 431

Answers (1)

Kevin Reid
Kevin Reid

Reputation: 43743

In order to sequence things like you say ("when A terminates it calls B"), you need to have some way of noticing when all of A's tasks have finished. The easiest way to do that is to spawn them in a scope, which waits for all its tasks to finish. Then, if you are doing that, you might as well use a single function to sequence the scopes for A, B, and C.

Here is a demonstration program. Note that inside of the pool, rayon::scope is used, not pool.scope(); this still uses pool since it is executing in context of pool.

fn main() {
    let pool = rayon_core::ThreadPoolBuilder::default().build().unwrap();
    pool.spawn(move || {
        for _ in 0..10 { // this would be `loop {` in the real program
            rayon::scope(|scope| {
                for _ in 0..10 {
                    scope.spawn(|_| println!("task A"));
                }
            });

            rayon::scope(|scope| {
                for _ in 0..10 {
                    scope.spawn(|_| println!("task B"));
                }
            });
    
            rayon::scope(|scope| {
                for _ in 0..10 {
                    scope.spawn(|_| println!("task C"));
                }
            });
        }
    });

    // Apparently dropping the pool stops spawns, so keep it alive for a bit.
    std::thread::sleep(std::time::Duration::from_secs(1));
}

And if you use Rayon's higher level operations like parallel iteration or join(), you don't need a explicit scope: the collect(), for_each(), or join() call itself waits for all child tasks. The scope is just for if you want to spawn(), specifically.

Upvotes: 1

Related Questions