Reputation: 2186
I have a setup where my program spawns several threads for CPU-bound computation using the std::thread::spawn
.
I need a GRPC server to handle incoming commands and also stream outputs done by the worker threads. I'm using tonic
for the GRPC server, and it only offers an async implementation inside a Tokio future.
I need to be able to send messages from my "normal" standard-library threads to the Tokio future.
I've boiled my code down the the minimum here:
use std::thread;
use tokio::sync::mpsc; // 1.9.0
fn main() {
let (tx, mut rx) = mpsc::channel(1);
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
tokio_runtime.spawn(async move {
// the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
while let Some(v) = rx.recv().await {}
});
let h = thread::spawn(move || {
// do work
tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
});
h.join().unwrap();
}
How can my main worker threads communicate with the Tokio-spawned GRPC server?
Upvotes: 3
Views: 1710
Reputation: 15683
You can use tokio's sync
features. There are two options - UnboundedSender
and Sender::blocking_send()
.
The issue with the unbounded sender is that it does not have back-pressure and if your producer is faster than the consumer your application may crash with an out-of-memory error or exhaust other limited resources your producer uses.
As a general rule, you should avoid using unbounded queues, which leaves us with the better option of using blocking_send()
:
use std::thread;
use tokio::sync::mpsc; // 1.9.0
fn main() {
let (tx, mut rx) = mpsc::channel(1);
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
tokio_runtime.spawn(async move {
// the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
while let Some(v) = rx.recv().await {
println!("Received: {:?}", v);
}
});
let h = thread::spawn(move || {
// do work
tx.blocking_send(1).unwrap();
});
h.join().unwrap();
}
Upvotes: 7