Enthys
Enthys

Reputation: 305

How can I move the data between threads safely?

I'm currently trying to call a function to which I pass multiple file names and expect the function to read the files and generate the appropriate structs and return them in a Vec<Audit>. I've been able to accomplish it reading the files one by one but I want to achieve it using threads.

This is the function:

fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    let mut audits = Arc::new(Mutex::new(vec![]));
    let mut handlers = vec![];

    for file in files {
        let audits = Arc::clone(&audits);

        handlers.push(thread::spawn(move || {
            let mut audits = audits.lock().unwrap();
            audits.push(audit_from_xml_file(file.clone()));
            audits
        }));
    }

    for handle in handlers {
        let _ = handle.join();
    }

    audits
        .lock()
        .unwrap()
        .into_iter()
        .fold(vec![], |mut result, audit| {
            result.push(audit);
            result
        })
}

But it won't compile due to the following error:

error[E0277]: `MutexGuard<'_, Vec<Audit>>` cannot be sent between threads safely
   --> src/main.rs:82:23
    |
82  |         handlers.push(thread::spawn(move || {
    |                       ^^^^^^^^^^^^^ `MutexGuard<'_, Vec<Audit>>` cannot be sent between threads safely
    | 
   ::: /home/enthys/.rustup/toolchains/nightly-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/std/src/thread/mod.rs:618:8

I have tried wrapping the generated Audit structs in Some(Audit) to avoid the MutexGuard but then I stumble with Poisonned Thread issues.

Upvotes: 0

Views: 630

Answers (2)

Masklinn
Masklinn

Reputation: 42282

The cause of the error is that after after pushing the new Audit into the (locked) audits vec you then try to return the vec's MutexGuard.

In Rust, a thread's function can actually return values, the point of doing that is to send the value back to whoever is join-ing the thread. This means the value is going to move between threads, so the value needs to be movable betweem threads (aka Send), which mutex guards have no reason to be[0].

The easy solution is to just... not do that. Just delete the last line of the spawn function. Though it's not like the code works after that as you still have borrowing issue related to the thing at the end.

An alternative is to lean into the feature (especially if Audit objects are not too big): drop the audits vec entirely and instead have each thread return its audit, then collect from the handlers when you join them:

pub fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    let mut handlers = vec![];

    for file in files {
        handlers.push(thread::spawn(move || {
            audit_from_xml_file(file)
        }));
    }
    
    handlers.into_iter()
        .map(|handler| handler.join().unwrap())
        .collect()
}

Though at that point you might as well just let Rayon handle it:

use rayon::prelude::*;

pub fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    files.into_par_iter().map(audit_from_xml_file).collect()
}

That also avoids crashing the program or bringing the machine to its knees if you happen to have millions of files.

[0] and all the reasons not to be, locking on one thread and unlocking on an other is not necessarily supported e.g. ReleaseMutex

The ReleaseMutex function fails if the calling thread does not own the mutex object.

(NB: in the windows lingo, "owning" a mutex means having acquired it via WaitForSingleObject, which translates to lock in posix lingo)

and can be plain UB e.g. pthread_mutex_unlock

If a thread attempts to unlock a mutex that it has not locked or a mutex which is unlocked, undefined behavior results.

Upvotes: 3

rodrigo
rodrigo

Reputation: 98368

Your problem is that you are passing your Vec<Audit> (or more precisely the MutexGuard<Vec<Audit>>), to the threads and back again, without really needing it.

And you don't need Mutex or Arc for this simpler task:

fn generate_audits_from_files(files: Vec<String>) -> Vec<Audit> {
    let mut handlers = vec![];

    for file in files {
        handlers.push(thread::spawn(move || {
            audit_from_xml_file(file)
        }));
    }

    handlers
        .into_iter()
        .flat_map(|x| x.join())
        .collect()
}

Upvotes: 1

Related Questions