Reputation: 32715
What are some good ways to adapt this Barrier
example to handle two differences:
the number of items is not known in advance (for example, in the case where splitting a large file into lines)
without tracking thread handles (e.g. without using the handles
vector in the example below). The motivation is that doing so adds additional overhead.
Example code:
use std::sync::{Arc, Barrier};
use std::thread;
let mut handles = Vec::with_capacity(10);
let barrier = Arc::new(Barrier::new(10));
for _ in 0..10 {
let c = barrier.clone();
handles.push(thread::spawn(move|| {
// do some work
c.wait();
}));
}
// Wait for other threads to finish.
for handle in handles {
handle.join().unwrap();
}
Code snippet is adapted slightly from the Barrier
docs.
The first thing that crossed my mind would be (if possible) to mutate the inner value of the Barrier
; however, the API does not provide mutable access to the num_threads
property of the Barrier
struct.
Another idea would be to not use the Barrier
and instead write custom logic with AtomicUsize
.
I'm open to learning the most ergonomic / idiomatic ways to do this in Rust.
Upvotes: 7
Views: 12113
Reputation: 419
Have each thread send its result (or just Thread::current) to a multiple-producer, single-consumer, Channel that the waiting parent thread is consuming. The standard library provides an mpsc channnel for this purpose (). It's a lot less CPU intensive than a spinlock.
Upvotes: 3
Reputation: 3861
You can use spinlock on atomic for waiting for all threads to exit. Of course, instead of using static atomic, you can pass Arc<AtomicUsize>
into each thread.
Ordering::SeqCst
is probably too strong, but concurrent programming is hard, and I'm not sure how this ordering can be relaxed.
While it can be done this way, cost of creating threads will probably dwarf micro optimization like this. Also it's worth considering that busy wait can decrease performance of a program.
use std::panic;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::thread;
use std::time::Duration;
static GLOBAL_THREAD_COUNT: AtomicUsize = ATOMIC_USIZE_INIT;
fn main() {
for i in 0..10 {
// mark that the thread is about to run
// we need to do it in the main thread to prevent spurious exits
GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::SeqCst);
thread::spawn(move|| {
// We need to catch panics to reliably signal exit of a thread
let result = panic::catch_unwind(move || {
// do some work
println!("{}-th thread reporting", i+1);
});
// process errors
match result {
_ => {}
}
// signal thread exit
GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::SeqCst);
});
}
// Wait for other threads to finish.
while GLOBAL_THREAD_COUNT.load(Ordering::SeqCst) != 0 {
thread::sleep(Duration::from_millis(1));
}
}
Upvotes: 3