Boris Mulder
Boris Mulder

Reputation: 364

Using a std::sync::mpsc::channel to send data from an async task to sync world

I have a program that is largely synchronous. Only for certain operations, it needs to perform async calls from a private library. So I'm building a new tokio runtime on which I spawn some worker task, and then pass the data from that task to my sync world using a channel, which calls recv() in a blocking manner.

Greatly simplified, the problematic part of the code looks like this:

use std::sync::mpsc::{channel, Sender};

struct Worker {
    chan: Sender<bool>
}

impl Worker {
    async fn do_work(&self) {
        loop {
            // do some more async work
            self.chan.send(true).unwrap();
        }
    }
}

fn main() {
    let (tx, rx) = channel::<bool>();
    let rt = tokio::runtime::Builder::new_current_thread()
        .enable_all()
        .build()
        .unwrap();
    
    let _ = rt.spawn(async move {
        let worker = Worker { chan: tx };
        worker.do_work().await;
    });
    
    println!("received {}", rx.recv().unwrap());
}

On playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=156e86c82839ef9d2a50fbcb8a67b412

But this code does not compile, and the relevant message is:

within `Worker`, the trait `Sync` is not implemented for `std::sync::mpsc::Sender<bool>`
 note: future is not `Send` as this value is used across an await

The channel is only passed to one thread, and cloning the sender like in this question does not solve it. Similarly, using an async channel is not an option since the receiver is sync. Instead it might have to do with how it is contained within the worker object, as the error message suggests. But in my case, the worker object needs to do other things, and call the method that calls send() multiple times, among some other async operations. It needs to be wrapped in an object since it is passed to another library function (it is always moved, not passed by reference though).

What can I do to make this work?

Upvotes: 3

Views: 3039

Answers (1)

Chayim Friedman
Chayim Friedman

Reputation: 71370

The hard way

Never hold a reference to the channel or the worker between await points. For example, take self instead of &self in your code.

The easy way

Use an async channel. tokio's channels have a blocking_recv() method you can use in synchronous contexts.

Or use a different channel implementation, such as crossbeam's, whose Sender is Sync.

Upvotes: 3

Related Questions