Reputation: 553
I am trying to use tokio::mpsc::channel
to send data from a synchronous function to tokio thread to handle it asynchronously.
Since tokio::mpsc::channel
is an async function, I spawn a runtime from the sync function to create rx and tx and return tx after moving rx to the newly spawned task in it.
However, it does not work as I expected and I have conducted some debugging and found out the followings.
https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html.
The documentation says that the tx report itself is closed only when rx handle gets dropped or explicitly calls the close
function. It seems neither in this case.
fn websocket(base: &Url, id: &str) -> Result<(Sender<String>, Stream<String>), BoxErr> {
...
let ws_reciver_sink = Sink::new();
let ws_receiver_stream = ws_reciver_sink.stream();
let (ws_sender_tx, mut ws_sender_rx) = mpsc::channel(100);
debug!("ws_sender_channel is closed 1: {}", ws_sender_tx.is_closed());
runtime::Builder::new_current_thread()
.enable_all()
.build()?
.block_on(async move {
let (ws_stream, _res) = connect_async(url).await?;
let (mut ws_sender_inner, mut ws_receiver_inner) = ws_stream.split();
debug!("spawning ws_recv_task");
let ws_recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(msg))) = ws_receiver_inner.next().await {
ws_reciver_sink.send(msg);
}
});
debug!("spawning ws_send_task");
let ws_send_task = tokio::spawn(async move {
debug!("moving ws_sender_rx handle");
while let Some(msg) = ws_sender_rx.recv().await {
if ws_sender_inner.send(Message::Text(msg)).await.is_err() {
ws_recv_task.abort();
}
debug!("dropping ws_sender_rx handle");
}
});
Ok::<(), BoxErr>(())
})?;
debug!("ws_sender_channel is closed 2: {}", ws_sender_tx.is_closed());
Ok((ws_sender_tx, ws_receiver_stream))
}
Output
[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] connecting to websocket ws://127.0.0.1:8000/node/test-a
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 1: false
[2022-12-13T11:58:45Z DEBUG tungstenite::handshake::client] Client handshake done.
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_recv_task
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_send_task
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 2: true
[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed: true
thread 'tests::test_connect' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("Some nights")', iot/src/lib.rs:199:50
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
What am I missing? Please enlighten me.
Upvotes: 3
Views: 1284
Reputation: 1
I stumbled upon exactly same situation and nearly identical code :-)
let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);
solution was to instantiate channel within async runtime
[tokio::main] // <---------------------------
async fn main() -> Result<(), Box<dyn Error>> {
...
let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);
// async code
comm::connect(Handle::current(), rx);
...
// sync code here
...
}
--- async code ---
pub fn connect(runtime: Handle, mut rx: Receiver<Pos2>) {
thread::spawn(move || {
runtime.block_on(async {
...
})
})
Upvotes: 0