lulijun
lulijun

Reputation: 575

Is it possible to implement a feature like Java's CompletableFuture::complete in Rust?

I'm a beginner in rust, and I'm trying to use rust's asynchronous programming.

In my requirement scenario, I want to create a empty Future and complete it in another thread after a complex multi-round scheduling process. The CompletableFuture::complete of Java can meet my needs very well.

I have tried to find an implementation of Rust, but haven't found one yet.

Is it possible to do it in Rust?

I understand from the comments below that using a channel for this is more in line with rust's design.

My scenario is a hierarchical scheduling executor. For example, Task1 will be splitted to several Drivers, each Driver will use multi thread(rayon threadpool) to do some computation work, and the former driver's state change will trigger the execution of next driver, the result of the whole task is the last driver's output and the intermedia drivers have no output. That is to say, my async function cannot get result from one spawn task directly, so I need a shared stack variable or a channel to transfer the result.

So what I really want is this: the last driver which is executed in a rayon thread, it can get a channel's tx by it's identify without storing it (to simplify the state change process).

I found the tx and rx of oneshot cannot be copies and they are not thread safe, and the send method of tx need ownership. So, I can't store the tx in main thread and let the last driver find it's tx by identify. But I can use mpsc to do that, I worte 2 demos and pasted it into the body of the question, but I have to create mpsc with capacity 1 and close it manually.

I wrote 2 demos, as bellow.I wonder if this is an appropriate and efficient use of mpsc?

Version implemented using oneshot, cannot work.

#[tokio::test]
pub async fn test_async() -> Result<()>{
    let mut executor = Executor::new();

    let res1 = executor.run(1).await?;
    let res2 = executor.run(2).await?;
    println!("res1 {}, res2 {}", res1, res2);
    Ok(())
}

struct Executor {
    pub pool: ThreadPool,
    pub txs: Arc<DashMap<i32, RwLock<oneshot::Sender<i32>>>>,
}

impl Executor {
    pub fn new() -> Self {
        Executor{
            pool: ThreadPoolBuilder::new().num_threads(10).build().unwrap(),
            txs: Arc::new(DashMap::new()),
        }
    }

    pub async fn run(&mut self, index: i32) -> Result<i32> {
        let (tx, rx) = oneshot::channel();
        self.txs.insert(index, RwLock::new(tx));
        let txs_clone = self.txs.clone();
        self.pool.spawn(move || {
            let spawn_tx = txs_clone.get(&index).unwrap();
            let guard = block_on(spawn_tx.read());
            // cannot work, send need ownership, it will cause move of self
            guard.send(index);
        });

        let res = rx.await;
        return Ok(res.unwrap());
    }
}

Version implemented using mpsc, can work, not sure about performance

#[tokio::test]
pub async fn test_async() -> Result<()>{
    let mut executor = Executor::new();

    let res1 = executor.run(1).await?;
    let res2 = executor.run(2).await?;
    println!("res1 {}, res2 {}", res1, res2);
    // close channel after task finished
    executor.close(1);
    executor.close(2);
    Ok(())
}

struct Executor {
    pub pool: ThreadPool,
    pub txs: Arc<DashMap<i32, RwLock<mpsc::Sender<i32>>>>,
}

impl Executor {
    pub fn new() -> Self {
        Executor{
            pool: ThreadPoolBuilder::new().num_threads(10).build().unwrap(),
            txs: Arc::new(DashMap::new()),
        }
    }

    pub fn close(&mut self, index:i32) {
        self.txs.remove(&index);
    }

    pub async fn run(&mut self, index: i32) -> Result<i32> {
        let (tx, mut rx) = mpsc::channel(1);
        self.txs.insert(index, RwLock::new(tx));
        let txs_clone = self.txs.clone();
        self.pool.spawn(move || {
            let spawn_tx = txs_clone.get(&index).unwrap();
            let guard = block_on(spawn_tx.value().read());
            block_on(guard.deref().send(index));
        });
        // 0 mock invalid value
        let mut res:i32 = 0;
        while let Some(data) = rx.recv().await {
            println!("recv data {}", data);
            res = data;
            break;
        }
        return Ok(res);
    }
}

Upvotes: 1

Views: 430

Answers (1)

Matthieu M.
Matthieu M.

Reputation: 299740

Disclaimer: It's really hard to picture what you are attempting to achieve, because the examples provided are trivial to solve, with no justification for the added complexity (DashMap). As such, this answer will be progressive, though it will remain focused on solving the problem you demonstrated you had, and not necessarily the problem you're thinking of... as I have no crystal ball.

We'll be using the following Result type in the examples:

type Result<T> = Result<T, Box<dyn Error + Send + Sync + 'static>>;

Serial execution

The simplest way to execute a task, is to do so right here, right now.

impl Executor {
    pub async fn run<F>(&self, task: F) -> Result<i32>
    where
        F: FnOnce() -> Future<Output = Result<i32>>,
    {
        task().await
    }
}

Async execution - built-in

When the execution of a task may involve heavy-weight calculations, it may be beneficial to execute it on a background thread.

Whichever runtime you are using probably supports this functionality, I'll demonstrate with tokio:

impl Executor {
    pub async fn run<F>(&self, task: F) -> Result<i32>
    where
        F: FnOnce() -> Result<i32>,
    {
        Ok(tokio::task::spawn_block(task).await??)
    }
}

Async execution - one-shot

If you wish to have more control on the number of CPU-bound threads, either to limit them, or to partition the CPUs of the machine for different needs, then the async runtime may not be configurable enough and you may prefer to use a thread-pool instead.

In this case, synchronization back with the runtime can be achieved via channels, the simplest of which being the oneshot channel.

impl Executor {
    pub async fn run<F>(&self, task: F) -> Result<i32>
    where
        F: FnOnce() -> Result<i32>,
    {
        let (tx, mut rx) = oneshot::channel();

        self.pool.spawn(move || {
            let result = task();

            //  Decide on how to handle the fact that nobody will read the result.
            let _ = tx.send(result);
        });

        Ok(rx.await??)
    }
}

Note that in all of the above solutions, task remains agnostic as to how it's executed. This is a property you should strive for, as it makes it easier to change the way execution is handled in the future by more neatly separating the two concepts.

Upvotes: 0

Related Questions