heash
heash

Reputation: 33

How to use VecDeque in multi-threaded app?

I am trying to create a multi-threaded app using a VecDeque. I wanted to use it as a shared queue with read-write permissions for all threads. I have the following code:

use std::collections::VecDeque;
use std::{thread, time};

fn main() {
    let mut workload = VecDeque::new();
    workload.push_back(0);

    let mut thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 +=1;
            thread_1_queue.push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue);

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let mut thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 +=1;
            thread_2_queue.push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue);

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        };
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload);

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}

Playground link (Beware that it will run endlessly)

My problem is now that the clones in the threads won't update the main queue. Now each thread has his own queue instead of have one shared. As shown here:

Thread #1: [0, 1]
MainQueue: [0]
Thread #2: [0, 11]
Thread #1: [0, 1, 2]
Thread #2: [0, 11, 12]
MainQueue: [0]
MainQueue: [0]
Thread #2: [0, 11, 12, 13]
Thread #1: [0, 1, 2, 3]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14]
Thread #1: [0, 1, 2, 3, 4]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15]
Thread #1: [0, 1, 2, 3, 4, 5]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16]
Thread #1: [0, 1, 2, 3, 4, 5, 6]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7]
MainQueue: [0]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8]
MainQueue: [0]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Thread #2: [0, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
Thread #1: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
MainQueue: [0]

Upvotes: 3

Views: 4527

Answers (1)

Finomnis
Finomnis

Reputation: 22738

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let workload = Arc::new(Mutex::new(VecDeque::new()));
    workload.lock().unwrap().push_back(0);

    let thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 += 1;
            thread_1_queue.lock().unwrap().push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 += 1;
            thread_2_queue.lock().unwrap().push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.lock().unwrap().capacity() == 10 {
            break;
        }

        println!("MainQueue: {:?}", workload.lock().unwrap());

        thread::sleep(some_time);
    }

    thread_1.join();
    thread_2.join();
}
Thread #1: [0, 1]
MainQueue: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #2: [0, 1, 11, 12]
Thread #1: [0, 1, 11, 12, 2]
MainQueue: [0, 1, 11, 12, 2]
Thread #2: [0, 1, 11, 12, 2, 13]
Thread #1: [0, 1, 11, 12, 2, 13, 3]
MainQueue: [0, 1, 11, 12, 2, 13, 3]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4]
MainQueue: [0, 1, 11, 12, 2, 13, 3, 14, 4]
Thread #2: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15]
Thread #1: [0, 1, 11, 12, 2, 13, 3, 14, 4, 15, 5]
...

Explanation

Arc creates a multi-threaded reference counter with which you can share a single object to multiple threads. Note that the content of Arc is always immutable because multiple mutable references to the same object are never allowed in Rust.

That's why you need a Mutex internally. It creates what is called interior mutability. That means, you can use it to temporarily get mutable access to the object, while it makes sure that the mutable access doesn't collide with other threads.

Further, this means that when a different thread calls lock() while it is already locked, it will block the other thread. This is what is called a bottleneck and will limit the amount of speedup you will get from your multithreading.

Further, be aware that between two lock()s, the content of the queue could change. So if it's important that something happens atomically to the queue, you need to keep the queue locked for the entire duration of that action, which further reduces your speedup.

Further bugs

  • I think you mix up .capacity() and .len().
  • You should do something with the Result of .join(), which I here will simply .unwrap().
  • .len() == 10 won't work in a multi-threaded scenario, because it could jump directly from 9 to 11. So for multi-threaded scenarios, it's better to do >= 10, which will always work.

Fixed code that doesn't run forever:

use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::{thread, time};

fn main() {
    let workload = Arc::new(Mutex::new(VecDeque::new()));
    workload.lock().unwrap().push_back(0);

    let thread_1_queue = workload.clone();
    let thread_1 = thread::spawn(move || {
        let mut counter1: i32 = 0;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter1 += 1;
            thread_1_queue.lock().unwrap().push_back(counter1);

            println!("Thread #1: {:?}", thread_1_queue.lock().unwrap());

            if counter1 == 10 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let thread_2_queue = workload.clone();
    let thread_2 = thread::spawn(move || {
        let mut counter2: i32 = 10;
        let some_time = time::Duration::from_millis(50);

        loop {
            counter2 += 1;
            thread_2_queue.lock().unwrap().push_back(counter2);

            println!("Thread #2: {:?}", thread_2_queue.lock().unwrap());

            if counter2 == 20 {
                break;
            }

            thread::sleep(some_time);
        }
    });

    let some_time = time::Duration::from_millis(50);

    loop {
        if workload.lock().unwrap().len() >= 10 {
            break;
        }

        println!("MainQueue: {:?}", workload.lock().unwrap());

        thread::sleep(some_time);
    }

    thread_1.join().unwrap();
    thread_2.join().unwrap();
}
Thread #1: [0, 1]
Thread #2: [0, 1, 11]
MainQueue: [0, 1, 11]
Thread #1: [0, 1, 11, 2]
Thread #2: [0, 1, 11, 2, 12]
MainQueue: [0, 1, 11, 2, 12]
Thread #1: [0, 1, 11, 2, 12, 3]
MainQueue: [0, 1, 11, 2, 12, 3]
Thread #2: [0, 1, 11, 2, 12, 3, 13]
MainQueue: [0, 1, 11, 2, 12, 3, 13]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4]
MainQueue: [0, 1, 11, 2, 12, 3, 13, 14, 4]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9]
Thread #2: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20]
Thread #1: [0, 1, 11, 2, 12, 3, 13, 14, 4, 15, 5, 16, 6, 17, 7, 18, 8, 19, 9, 20, 10]

Upvotes: 6

Related Questions