heroxav
heroxav

Reputation: 1467

Is there a way of spawning new threads with copies of existing data that have a non-static lifetime?

I have a problem that is similar to what is discussed in Is there a succinct way to spawn new threads with copies of existing data?. Unlike the linked question, I am trying to move an object with an associated lifetime into the new thread.

Intuitively, what I am trying to do is to copy everything that is necessary to continue the computation to the new thread and exit the old one. However, when trying to move cloned data (with a lifetime) to the new thread, I get the following error:

error[E0759]: data has lifetime 'a but it needs to satisfy a 'static lifetime requirement

I created a reproducible example based on the referenced question here. This is just to exemplify the problem. Here, the lifetimes could be removed easily but in my actual use-case the data I want to move to the thread is much more complex.

Is there an easy way of making this work with Rust?

Upvotes: 1

Views: 278

Answers (1)

Todd
Todd

Reputation: 5385

A qualified answer to the question in the title is "yes", but we can't do it by copying non-static references. The reasons for this seeming limitation are sound. The way we can get the required data/objects into the thread closures is by passing ownership of them (or copies of them, or other concrete objects that represent them) to the closures.

It may not be immediately clear on how to do this with a complex library like pyo3 since much of the API returns reference types to objects rather than concrete objects that can be passed as-is to other threads, but the library does provide ways to pass Python data/objects to other threads, which I'll cover in the second example below.

The start() function will need to put a 'static bound on the closure type associated with its data parameter because within its body, start() is passing these closures on to other threads. The compiler is working to guarantee that the closures aren't holding on to references to anything that may evaporate if a thread runs longer than its parent, which is why it gripes without the 'static guarantee.

fn start<'a>(data      : Vec<Arc<dyn Fn() -> f64 + Send + Sync + 'static>>,
             more_data : String) 
{
    for _ in 1..=4 {
        let cloned_data = data.clone();
        let cloned_more_data = more_data.clone();
        thread::spawn(move || foo(cloned_data, cloned_more_data));
    }
}

A 'static bound is different than a 'static lifetime applied to a reference (data: 'static vs. &'static data). In the case of a bound, it only means the type it's applied to doesn't contain any non-static references (if it even holds any references at all). It's pretty common to see this bound applied to method parameters in threaded code.

As this applies specifically to the pyo3 problem space, we can avoid forming closures that contain non-static references by converting any such references to owned objects, then when the callback, running in another thread, needs to do something with them, it can acquire the GIL and cast them back to Python object references.

More about this in the code comments below. I took a simple example from the pyo3 GitHub README and combined it with the code provided in the playground example.

Something to watch out for when applying this pattern is deadlock. The threads will need to acquire the GIL in order to use the Python objects they have access to. In the example, once the parent thread is done spawning new threads, it releases the GIL when it goes out of scope. The parent then waits for the child threads to complete by joining their handles.

use std::thread;
use std::thread::JoinHandle;
use std::sync::Arc;

use pyo3::prelude::*;
use pyo3::types::IntoPyDict;
use pyo3::types::PyDict;

type MyClosure<'a> = dyn Fn() -> f64 + Send + Sync + 'a;

fn main() -> Result<(), ()> 
{
    match Python::with_gil(|py| main_(py)
        .map_err(|e| e.print_and_set_sys_last_vars(py))) 
    {
        Ok(handles) => {
            for handle in handles {
                handle.join().unwrap();
            }},
        Err(e) => { println!("{:?}", e); },
    }
    Ok(())
}

fn main_(py: Python) -> PyResult<Vec<JoinHandle<()>>> 
{
    let sys     = py.import("sys")?;
    let version = sys.get("version")?.extract::<String>()?;
    let locals  = [("os", py.import("os")?)].into_py_dict(py);
    let code    = "os.getenv('USER') or os.getenv('USERNAME') or 'Unknown'";
    let user    = py.eval(code, None, Some(&locals))?.extract::<String>()?;
    
    println!("Hello {}, I'm Python {}", user, version);
    
    // The thread will do something with the `locals` dictionary. In order to
    // pass this reference object to the thread, first convert it to a 
    // non-reference object.
    
    // Convert `locals` to `PyObject`.
    let locals_obj = locals.to_object(py);
    
    // Now we can move `locals_obj` into the thread without concern.
    let closure: Arc<MyClosure<'_>> = Arc::new(move || {
    
        // We can print out the PyObject which reveals it to be a tuple
        // containing a pointer value.
        println!("{:?}", locals_obj);
        
        // If we want to do anything with the `locals` object, we can cast it
        // back to a `PyDict` reference. We'll need to acquire the GIL first.
        Python::with_gil(|py| {
        
            // We have the GIL, cast the dict back to a PyDict reference.
            let py_dict = locals_obj.cast_as::<PyDict>(py).unwrap();
            
            // Printing it out reveals it to be a dictionary with the key `os`.
            println!("{:?}", py_dict);
        });
        1.
    });
    let data    = vec![closure];
    let more    = "Important data.".to_string();    
    let handles = start(data, more);
    Ok(handles)
}

fn start<'a>(data : Vec<Arc<MyClosure<'static>>>,
             more : String
            ) -> Vec<JoinHandle<()>>
{
    let mut handles = vec![];
    for _ in 1..=4 {
        let cloned_data = data.clone();
        let cloned_more = more.clone();
        
        let h = thread::spawn(move || foo(cloned_data, cloned_more));
        handles.push(h);
    }
    handles
}           

fn foo<'a>(data : Vec<Arc<MyClosure<'a>>>,
           more : String)
{
    for closure in data {
        closure();
    }
}

Output:

Hello todd, I'm Python 3.8.10 (default, Jun  2 2021, 10:49:15) 
[GCC 9.4.0]
Py(0x7f3329ccdd40)
Py(0x7f3329ccdd40)
Py(0x7f3329ccdd40)
{'os': <module 'os' from '/usr/lib/python3.8/os.py'>}
{'os': <module 'os' from '/usr/lib/python3.8/os.py'>}
{'os': <module 'os' from '/usr/lib/python3.8/os.py'>}
Py(0x7f3329ccdd40)
{'os': <module 'os' from '/usr/lib/python3.8/os.py'>}

Something to consider: you may be able to minimize, or eliminate, the need to pass Python objects to the threads by extracting all the information needed from them into Rust objects and passing those to threads instead.

Upvotes: 2

Related Questions