Reputation: 7284
Following code is working, it can be tested in Playground
use std::{thread, time::Duration};
use rand::Rng;
fn main() {
let mut hiv = Vec::new();
let (sender, receiver) = crossbeam_channel::unbounded();
// make workers
for t in 0..5 {
println!("Make worker {}", t);
let receiver = receiver.clone(); // clone for this thread
let handler = thread::spawn(move || {
let mut rng = rand::thread_rng(); // each thread have one
loop {
let r = receiver.recv();
match r {
Ok(x) => {
let s = rng.gen_range(100..1000);
thread::sleep(Duration::from_millis(s));
println!("w={} r={} working={}", t, x, s);
},
_ => { println!("No more work for {} --- {:?}.", t, r); break},
}
}
});
hiv.push(handler);
}
// Generate jobs
for x in 0..10 {
sender.send(x).expect("all threads hung up :(");
}
drop(sender);
// wait for jobs to finish.
println!("Wait for all threads to finish.\n");
for h in hiv {
h.join().unwrap();
}
println!("join() done. Work Finish.");
}
My question is following :
Can I remove boilerplate code by using threadpool, rayon or some other Rust crate ?
I know that I could do my own implementation, but would like to know is there some crate with same functionality ?
From my research threadpool/rayon are useful when you "send" code and it is executed, but I have not found way to make N threads that will have some code/logic that they need to remember ?
Basic idea is in let mut rng = rand::thread_rng();
this is instance that each thread need to have on it own.
Also is there are some other problems with code, please point it out.
Upvotes: 2
Views: 707
Reputation: 984
Yes, you can use Rayon to eliminate a lot of that code and make the remaining code much more readable, as illustrated in this gist:
https://gist.github.com/BillBarnhill/db07af903cb3c3edb6e715d9cedae028
The worker pool model is not great in Rust, due to the ownership rules. As a result parallel iterators are often a better choice.
I forgot to address your main concern, per thread context, originally. You can see how to store per thread context using a ThreadLocal! in this answer:
https://stackoverflow.com/a/42656422/204343
I will try to come back and edit the code to reflect ThreadLocal! use as soon as I have more time.
The gist requires nightly because of thread_id_value, but that is all but stable and can be removed if needed.
The real catch is that the gist has timing, and compares main_new with main_original, with surprising results. Perhaps not so surprising, Rayon has good debug support.
On Debug build the timing output is:
main_new duration: 1.525667954s
main_original duration: 1.031234059s
You can see main_new takes almost 50% longer to run.
On release however main_new is a little faster:
main_new duration: 1.584190936s
main_original duration: 1.5851124s
A slimmed version of the gist is below, with only the new code.
#![feature(thread_id_value)]
use std::{thread, time::Duration, time::Instant};
use rand::Rng;
#[allow(unused_imports)]
use rayon::prelude::*;
fn do_work(x : u32) -> String {
let mut rng = rand::thread_rng(); // each thread have one
let s = rng.gen_range(100..1000);
let thread_id = thread::current().id();
let t = thread_id.as_u64();
thread::sleep(Duration::from_millis(s));
format!("w={} r={} working={}", t, x, s)
}
fn process_work_product(output : String) {
println!("{}", output);
}
fn main() {
// bit hacky, but lets set number of threads to 5
rayon::ThreadPoolBuilder::new()
.num_threads(4)
.build_global()
.unwrap();
let x = 0..10;
x.into_par_iter()
.map(do_work)
.for_each(process_work_product);
}
Upvotes: 2