Reputation: 33
I am trying to create a multi-threaded app using a VecDeque
. I wanted to use it as a shared queue with read-write permissions for all threads. I have the following code:
use std::collections::VecDeque;
use std::{thread, time};
fn main() {
let mut workload = VecDeque::new();
workload.push_back(0);
let mut thread_1_queue = workload.clone();
let thread_1 = thread::spawn(move || {
let mut counter1: i32 = 0;
let some_time = time::Duration::from_millis(50);
loop {
counter1 +=1;
thread_1_queue.push_back(counter1);
println!("Thread #1: {:?}", thread_1_queue);
if counter1 == 10 {
break;
}
thread::sleep(some_time);
};
});
let mut thread_2_queue = workload.clone();
let thread_2 = thread::spawn(move || {
let mut counter2: i32 = 10;
let some_time = time::Duration::from_millis(50);
loop {
counter2 +=1;
thread_2_queue.push_back(counter2);
println!("Thread #2: {:?}", thread_2_queue);
if counter2 == 20 {
break;
}
thread::sleep(some_time);
};
});
let some_time = time::Duration::from_millis(50);
loop {
if workload.capacity() == 10 {
break;
}
println!("MainQueue: {:?}", workload);
thread::sleep(some_time);
}
thread_1.join();
thread_2.join();
}
Playground link (Beware that it will run endlessly)
My problem is now that the clones in the threads won't update the main queue. Now each thread has his own queue instead of have one shared. As shown here:
Thread #1: [0, 1]
MainQueue: [0]
Thread #2: [0, 11]
Thread #1: [0, 1, 2]
Thread #2: [0, 11, 12]
MainQueue: [0]
MainQueue: [0]
Thread #2: [0, 11, 12, 13]
Thread #1: [0, 1, 2, 3]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14]
Thread #1: [0, 1, 2, 3, 4]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15]
Thread #1: [0, 1, 2, 3, 4, 5]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16]
Thread #1: [0, 1, 2, 3, 4, 5, 6]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8]
MainQueue: [0]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
MainQueue: [0]
Upvotes: 3
Views: 4527
Reputation: 22738
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};
fn main() {
let workload = Arc::new(Mutex::new(VecDeque::new()));
workload.lock().unwrap().push_back(0);
let thread_1_queue = workload.clone();
let thread_1 = thread::spawn(move || {
let mut counter1: i32 = 0;
let some_time = time::Duration::from_millis(50);
loop {
counter1 += 1;
thread_1_queue.lock().unwrap().push_back(counter1);
println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());
if counter1 == 10 {
break;
}
thread::sleep(some_time);
}
});
let thread_2_queue = workload.clone();
let thread_2 = thread::spawn(move || {
let mut counter2: i32 = 10;
let some_time = time::Duration::from_millis(50);
loop {
counter2 += 1;
thread_2_queue.lock().unwrap().push_back(counter2);
println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());
if counter2 == 20 {
break;
}
thread::sleep(some_time);
}
});
let some_time = time::Duration::from_millis(50);
loop {
if workload.lock().unwrap().capacity() == 10 {
break;
}
println!("MainQueue: {:?}", workload.lock().unwrap());
thread::sleep(some_time);
}
thread_1.join();
thread_2.join();
}
Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...
Arc
creates a multi-threaded reference counter with which you can share a single object to multiple threads. Note that the content of Arc
is always immutable because multiple mutable references to the same object are never allowed in Rust.
That's why you need a Mutex
internally. It creates what is called interior mutability. That means, you can use it to temporarily get mutable access to the object, while it makes sure that the mutable access doesn't collide with other threads.
Further, this means that when a different thread calls lock()
while it is already locked, it will block the other thread. This is what is called a bottleneck and will limit the amount of speedup you will get from your multithreading.
Further, be aware that between two lock()
s, the content of the queue could change. So if it's important that something happens atomically to the queue, you need to keep the queue locked for the entire duration of that action, which further reduces your speedup.
.capacity()
and .len()
.Result
of .join()
, which I here will simply .unwrap()
..len() == 10
won't work in a multi-threaded scenario, because it could jump directly from 9
to 11
. So for multi-threaded scenarios, it's better to do >= 10
, which will always work.Fixed code that doesn't run forever:
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};
fn main() {
let workload = Arc::new(Mutex::new(VecDeque::new()));
workload.lock().unwrap().push_back(0);
let thread_1_queue = workload.clone();
let thread_1 = thread::spawn(move || {
let mut counter1: i32 = 0;
let some_time = time::Duration::from_millis(50);
loop {
counter1 += 1;
thread_1_queue.lock().unwrap().push_back(counter1);
println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());
if counter1 == 10 {
break;
}
thread::sleep(some_time);
}
});
let thread_2_queue = workload.clone();
let thread_2 = thread::spawn(move || {
let mut counter2: i32 = 10;
let some_time = time::Duration::from_millis(50);
loop {
counter2 += 1;
thread_2_queue.lock().unwrap().push_back(counter2);
println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());
if counter2 == 20 {
break;
}
thread::sleep(some_time);
}
});
let some_time = time::Duration::from_millis(50);
loop {
if workload.lock().unwrap().len() >= 10 {
break;
}
println!("MainQueue: {:?}", workload.lock().unwrap());
thread::sleep(some_time);
}
thread_1.join().unwrap();
thread_2.join().unwrap();
}
Thread #1: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #1: [0, 1, 11, 2]
Thread #2: [0, 1, 11, 2, 12]
MainQueue: [0, 1, 11, 2, 12]
Thread #1: [0, 1, 11, 2, 12, 3]
MainQueue: [0, 1, 11, 2, 12, 3]
Thread #2: [0, 1, 11, 2, 12, 3, 13]
MainQueue: [0, 1, 11, 2, 12, 3, 13]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4]
MainQueue: [0, 1, 11, 2, 12, 3, 13, 14, 4]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20, 10]
Upvotes: 6