WebOrCode
WebOrCode

Reputation: 7284

Rust threadpool with init code in each thread?

Following code is working, it can be tested in Playground

use std::{thread, time::Duration};
use rand::Rng;

fn main() {
    let mut hiv = Vec::new();
    let (sender, receiver) = crossbeam_channel::unbounded();
    
    // make workers
    for t in 0..5 {
        println!("Make worker {}", t);
        
        let receiver = receiver.clone();  // clone for this thread
        
        let handler = thread::spawn(move || {
            let mut rng = rand::thread_rng(); // each thread have one
            
            loop {
                let r = receiver.recv();
                match r {
                    Ok(x) => {
                        let s = rng.gen_range(100..1000);
                
                        thread::sleep(Duration::from_millis(s));
                        println!("w={} r={} working={}", t, x, s);
                    },
                    _ => { println!("No more work for {} --- {:?}.", t, r); break},
                }
            }
            
        });
        
        hiv.push(handler);
    }
    
    // Generate jobs
    for x in 0..10 {
        sender.send(x).expect("all threads hung up :(");
    }
    drop(sender);
    
    // wait for jobs to finish.
    println!("Wait for all threads to finish.\n");
    for h in hiv {
        h.join().unwrap();
    }
    println!("join() done. Work Finish.");
}

My question is following :
Can I remove boilerplate code by using threadpool, rayon or some other Rust crate ?

I know that I could do my own implementation, but would like to know is there some crate with same functionality ?

From my research threadpool/rayon are useful when you "send" code and it is executed, but I have not found way to make N threads that will have some code/logic that they need to remember ?

Basic idea is in let mut rng = rand::thread_rng(); this is instance that each thread need to have on it own.

Also is there are some other problems with code, please point it out.

Upvotes: 2

Views: 707

Answers (1)

Bill Barnhill
Bill Barnhill

Reputation: 984

Yes, you can use Rayon to eliminate a lot of that code and make the remaining code much more readable, as illustrated in this gist:

https://gist.github.com/BillBarnhill/db07af903cb3c3edb6e715d9cedae028

The worker pool model is not great in Rust, due to the ownership rules. As a result parallel iterators are often a better choice.

I forgot to address your main concern, per thread context, originally. You can see how to store per thread context using a ThreadLocal! in this answer:

https://stackoverflow.com/a/42656422/204343

I will try to come back and edit the code to reflect ThreadLocal! use as soon as I have more time.

The gist requires nightly because of thread_id_value, but that is all but stable and can be removed if needed.

The real catch is that the gist has timing, and compares main_new with main_original, with surprising results. Perhaps not so surprising, Rayon has good debug support.

On Debug build the timing output is:

main_new duration: 1.525667954s
main_original duration: 1.031234059s

You can see main_new takes almost 50% longer to run.

On release however main_new is a little faster:

main_new duration: 1.584190936s
main_original duration: 1.5851124s

A slimmed version of the gist is below, with only the new code.

#![feature(thread_id_value)]

use std::{thread, time::Duration, time::Instant};
use rand::Rng;

#[allow(unused_imports)]
use rayon::prelude::*;

fn do_work(x : u32) -> String {
    let mut rng = rand::thread_rng(); // each thread have one
    let s = rng.gen_range(100..1000);
    let thread_id = thread::current().id();
    
    let t = thread_id.as_u64();
    
    thread::sleep(Duration::from_millis(s));
    format!("w={} r={} working={}", t, x, s)
}

fn process_work_product(output : String) {
    println!("{}", output);
}

fn main() {
    // bit hacky, but lets set number of threads to 5
    rayon::ThreadPoolBuilder::new()
        .num_threads(4)
        .build_global()
        .unwrap();
    
    let x = 0..10;
    x.into_par_iter()
        .map(do_work)
        .for_each(process_work_product);
}

Upvotes: 2

Related Questions