Reputation: 1493
I am new to Rust, and struggling to deal with all those wrapper types in Rust. I am trying to write code that is semantically equal to the following C code. The code tries to create a big table for book keeping, but will divide the big table so that every thread will only access their local small slices of that table. The big table will not be accessed unless other threads quit and no longer access their own slice.
#include <stdio.h>
#include <pthread.h>
void* write_slice(void* arg) {
int* slice = (int*) arg;
int i;
for (i = 0; i < 10; i++)
slice[i] = i;
return NULL;
}
int main()
{
int* table = (int*) malloc(100 * sizeof(int));
int* slice[10];
int i;
for (i = 0; i < 10; i++) {
slice[i] = table + i * 10;
}
// create pthread for each slice
pthread_t p[10];
for (i = 0; i < 10; i++)
pthread_create(&p[i], NULL, write_slice, slice[i]);
for (i = 0; i < 10; i++)
pthread_join(p[i], NULL);
for (i = 0; i < 100; i++)
printf("%d,", table[i]);
}
How do I use Rust's types and ownership to achieve this?
Upvotes: 12
Views: 5372
Reputation: 1
I have a naive approach to threading on disjoint subsets of vectors. It consists of the following high level steps:
social golfing implemented in this manner:
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::mpsc;
use std::thread;
pub fn social_golf(myvec: &mut Vec<f64>) {
let len_vec = myvec.len();
let len_vec_p2 = len_vec / 2;
let mut symmetry = false;
if len_vec % 2 == 0 {
symmetry = true;
}
// interact each element "x" with element "x+i (mod vec_len)"
// i ~ distance of elements
// note that we only have to iterate through distances 1.. len/2 if the interaction is commutative
// results in all disjoint pairs interacting
for i in 1..=len_vec_p2 {
let mut exit_level = false;
let mut done: HashSet<usize> = HashSet::new();
// since we cant do (x+i, x+i+i) while x is started and x+i is being used,
// we will need 2 or 3 waves of starting threads for each distance level
while !exit_level {
let mut skip: HashSet<usize> = HashSet::new();
let mut pull: HashMap<usize, usize> = HashMap::new();
let mut instructions_chan: Vec<mpsc::Sender<&mut f64>> = vec![];
let mut first_open_channel: usize = 0;
// find disjoint non-overlapping pairs
// save done interactions in "done"
// save currently used elements in "skip" to be avoided
// if i == len/2 pairs are symmetric so save pair as "done" as well
// pull contains the registry of thread number for each element so each pair goes to a matching thread
for k in 0..len_vec {
let pair = modular_offset_in_range(k as u32, i as u32, 0, (len_vec - 1) as u32);
if !done.contains(&k) && !skip.contains(&k) && !skip.contains(&(pair as usize)) {
pull.insert(k, first_open_channel);
pull.insert(pair as usize, first_open_channel);
first_open_channel += 1;
done.insert(k);
skip.insert(k);
skip.insert(pair as usize);
if symmetry && i == len_vec_p2 {
done.insert(pair as usize);
}
}
}
thread::scope(|s| {
let mut handles: Vec<thread::ScopedJoinHandle<()>> = vec![];
// spawn as many threads as many operations we could safely select for the current wave
for _ in 0..pull.len() / 2 {
let (tx, rx) = mpsc::channel();
instructions_chan.push(tx);
let handle = s.spawn(move || {
println!("started thread");
let mut a = rx.recv().unwrap();
let mut b = rx.recv().unwrap();
play_golf(&mut a, &mut b);
println!("finished thread");
});
handles.push(handle);
}
// send each element to the thread that uses it
for (k, a) in myvec.iter_mut().enumerate() {
if pull.contains_key(&k) {
println!("{}", pull[&k]);
instructions_chan[pull[&k]].send(a).unwrap();
}
}
// wait for current bunch of threads to complete before starting next wave on the current distance level
for h in handles {
h.join().unwrap();
}
});
// since we only inserted vector indexes into "done", if done.len == vec.len,
// all elements have interacted with their pairs for the current distance
if done.len() == len_vec {
exit_level = true;
}
}
}
}
pub fn modular_offset(begin: u32, offset: u32, modulo: u32) -> u32 {
if begin + offset >= modulo {
return begin + offset - modulo;
} else {
return begin + offset;
}
}
pub fn modular_offset_in_range(begin: u32, offset: u32, modulo_begin: u32, modulo_end: u32) -> u32 {
if begin + offset > modulo_end {
return modulo_begin
+ modular_offset(begin - modulo_begin, offset, modulo_end - modulo_begin + 1);
} else {
return begin + offset;
}
}
pub fn play_golf(a: &mut f64, b: &mut f64) {
*a = *a + 101.0;
*b = *b + 101.0;
}
fn main() {
let mut myvec1 = vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0];
let mut myvec2 = vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0];
social_golf(&mut myvec1);
println!("{:#?}", myvec1);
social_golf(&mut myvec2);
println!("{:#?}", myvec2);
}
Upvotes: 0
Reputation: 59125
Let's start with the code:
// cargo-deps: crossbeam="0.7.3"
extern crate crossbeam;
const CHUNKS: usize = 10;
const CHUNK_SIZE: usize = 10;
fn main() {
let mut table = [0; CHUNKS * CHUNK_SIZE];
// Scoped threads allow the compiler to prove that no threads will outlive
// table (which would be bad).
let _ = crossbeam::scope(|scope| {
// Chop `table` into disjoint sub-slices.
for slice in table.chunks_mut(CHUNK_SIZE) {
// Spawn a thread operating on that subslice.
scope.spawn(move |_| write_slice(slice));
}
// `crossbeam::scope` ensures that *all* spawned threads join before
// returning control back from this closure.
});
// At this point, all threads have joined, and we have exclusive access to
// `table` again. Huzzah for 100% safe multi-threaded stack mutation!
println!("{:?}", &table[..]);
}
fn write_slice(slice: &mut [i32]) {
for (i, e) in slice.iter_mut().enumerate() {
*e = i as i32;
}
}
One thing to note is that this needs the crossbeam
crate. Rust used to have a similar "scoped" construct, but a soundness hole was found right before 1.0, so it was deprecated with no time to replace it. crossbeam
is basically the replacement.
What Rust lets you do here is express the idea that, whatever the code does, none of the threads created within the call to crossbeam::scoped
will survive that scope. As such, anything borrowed from outside that scope will live longer than the threads. Thus, the threads can freely access those borrows without having to worry about things like, say, a thread outliving the stack frame that table
is defined by and scribbling over the stack.
So this should do more or less the same thing as the C code, though without that nagging worry that you might have missed something. :)
Finally, here's the same thing using scoped_threadpool
instead. The only real practical difference is that this allows us to control how many threads are used.
// cargo-deps: scoped_threadpool="0.1.6"
extern crate scoped_threadpool;
const CHUNKS: usize = 10;
const CHUNK_SIZE: usize = 10;
fn main() {
let mut table = [0; CHUNKS * CHUNK_SIZE];
let mut pool = scoped_threadpool::Pool::new(CHUNKS as u32);
pool.scoped(|scope| {
for slice in table.chunks_mut(CHUNK_SIZE) {
scope.execute(move || write_slice(slice));
}
});
println!("{:?}", &table[..]);
}
fn write_slice(slice: &mut [i32]) {
for (i, e) in slice.iter_mut().enumerate() {
*e = i as i32;
}
}
Upvotes: 21