lighthouse
lighthouse

Reputation: 553

Unexpected close of tokio mpsc channel

I am trying to use tokio::mpsc::channel to send data from a synchronous function to tokio thread to handle it asynchronously.

Since tokio::mpsc::channel is an async function, I spawn a runtime from the sync function to create rx and tx and return tx after moving rx to the newly spawned task in it.

However, it does not work as I expected and I have conducted some debugging and found out the followings.

  1. The channel is not closed right after the creation.
  2. The channel reported itself to be closed right after the rx moved into a separate task.
  3. Right at the moment the channel reported itself to be closed, the spawned thread, which holds the moved rx seems does not even start. Hence couldn't be dropped, I guess.

https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Sender.html. The documentation says that the tx report itself is closed only when rx handle gets dropped or explicitly calls the close function. It seems neither in this case.

 fn websocket(base: &Url, id: &str) -> Result<(Sender<String>, Stream<String>), BoxErr> {
        
        ...

        let ws_reciver_sink = Sink::new();
        let ws_receiver_stream = ws_reciver_sink.stream();

        let (ws_sender_tx, mut ws_sender_rx) = mpsc::channel(100);
        debug!("ws_sender_channel is closed 1: {}", ws_sender_tx.is_closed());

        runtime::Builder::new_current_thread()
            .enable_all()
            .build()?
            .block_on(async move {
                let (ws_stream, _res) = connect_async(url).await?;

                let (mut ws_sender_inner, mut ws_receiver_inner) = ws_stream.split();

                debug!("spawning ws_recv_task");
                let ws_recv_task = tokio::spawn(async move {
                    while let Some(Ok(Message::Text(msg))) = ws_receiver_inner.next().await {
                        ws_reciver_sink.send(msg);
                    }
                });
                debug!("spawning ws_send_task");

                let ws_send_task = tokio::spawn(async move {
                    debug!("moving ws_sender_rx handle");
                    while let Some(msg) = ws_sender_rx.recv().await {
                        if ws_sender_inner.send(Message::Text(msg)).await.is_err() {
                            ws_recv_task.abort();
                        }
                        debug!("dropping ws_sender_rx handle");
                    }
                });
                Ok::<(), BoxErr>(())
            })?;
            debug!("ws_sender_channel is closed 2: {}", ws_sender_tx.is_closed());
        Ok((ws_sender_tx, ws_receiver_stream))
    }

Output

[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] connecting to websocket ws://127.0.0.1:8000/node/test-a
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 1: false
[2022-12-13T11:58:45Z DEBUG tungstenite::handshake::client] Client handshake done.
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_recv_task
[2022-12-13T11:58:45Z DEBUG iot] spawning ws_send_task
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed 2: true
[2022-12-13T11:58:45Z DEBUG reqwest::connect] starting new connection: http://127.0.0.1:8000/
[2022-12-13T11:58:45Z DEBUG iot] ws_sender_channel is closed: true
thread 'tests::test_connect' panicked at 'called `Result::unwrap()` on an `Err` value: SendError("Some nights")', iot/src/lib.rs:199:50
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

What am I missing? Please enlighten me.

Upvotes: 3

Views: 1284

Answers (1)

Sergey O
Sergey O

Reputation: 1

I stumbled upon exactly same situation and nearly identical code :-)

 let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);

solution was to instantiate channel within async runtime

[tokio::main] // <---------------------------
async fn main() -> Result<(), Box<dyn Error>> {
    ...
    let (tx, rx) = tokio::sync::mpsc::channel::<Pos2>(5);

    // async code
    comm::connect(Handle::current(), rx);

    ...
    // sync code here
    ...
}

--- async code ---

pub fn connect(runtime: Handle, mut rx: Receiver<Pos2>) {
    thread::spawn(move || {
        runtime.block_on(async {
               ...
        })
    })

Upvotes: 0

Related Questions