Cooper
Cooper

Reputation: 77

Receiver on tokio's mpsc channel only receives messages when buffer is full

I've spent a few hours trying to figure this out and I'm pretty done. I found the question with a similar name, but that looks like something was blocking synchronously which was messing with tokio. That very well may be the issue here, but I have absolutely no idea what is causing it.

Here is a heavily stripped down version of my project which hopefully gets the issue across.

use std::io;
use futures_util::{
    SinkExt,
    stream::{SplitSink, SplitStream},
    StreamExt,
};
use tokio::{
    net::TcpStream,
    sync::mpsc::{channel, Receiver, Sender},
};
use tokio_tungstenite::{
    connect_async,
    MaybeTlsStream,
    tungstenite::Message,
    WebSocketStream,
};

#[tokio::main]
async fn main() {
    connect_to_server("wss://a_valid_domain.com".to_string()).await;
}

async fn read_line() -> String {
    loop {
        let mut str = String::new();
        io::stdin().read_line(&mut str).unwrap();

        str = str.trim().to_string();
        if !str.is_empty() {
            return str;
        }
    }
}

async fn connect_to_server(url: String) {
    let (ws_stream, _) = connect_async(url).await.unwrap();

    let (write, read) = ws_stream.split();
    let (tx, rx) = channel::<ChannelMessage>(100);

    tokio::spawn(channel_thread(write, rx));
    tokio::spawn(handle_std_input(tx.clone()));

    read_messages(read, tx).await;
}

#[derive(Debug)]
enum ChannelMessage {
    Text(String),
    Close,
}


// PROBLEMATIC FUNCTION
async fn channel_thread(
    mut write: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
    mut rx: Receiver<ChannelMessage>,
) {
    while let Some(msg) = rx.recv().await {
        println!("{:?}", msg); // This only fires when buffer is full
        match msg {
            ChannelMessage::Text(text) => write.send(Message::Text(text)).await.unwrap(),
            ChannelMessage::Close => {
                write.close().await.unwrap();
                rx.close();

                return;
            }
        }
    }
}

async fn read_messages(
    mut read: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
    tx: Sender<ChannelMessage>,
) {
    while let Some(msg) = read.next().await {
        let msg = match msg {
            Ok(m) => m,
            Err(_) => continue
        };

        match msg {
            Message::Text(m) => println!("{}", m),
            Message::Close(_) => break,
            _ => {}
        }
    }

    if !tx.is_closed() {
        let _ = tx.send(ChannelMessage::Close).await;
    }
}

async fn handle_std_input(tx: Sender<ChannelMessage>) {
    loop {
        let str = read_line().await;
        if tx.is_closed() {
            break;
        }

        tx.send(ChannelMessage::Text(str)).await.unwrap();
    }
}

As you can see, what I'm trying to do is:

The issue lies in the channel_thread() function. I move the websocket writer into this function as well as the channel receiver. The issue is, it only loops over the sent objects when the buffer is full.

I've spent a lot of time trying to solve this, any help is greatly appreciated.

Upvotes: 4

Views: 2061

Answers (2)

Metehan Yıldırım
Metehan Yıldırım

Reputation: 401

I initially came here for a separate issue, but I wanted to share some of my findings with you.

If you're sending messages in a tight loop without yielding back to the Tokio scheduler, the sender may monopolize the thread and prevent the receiver task from being scheduled to process messages. This can create the appearance that messages are only received when the buffer is full, particularly if the buffer size is small.

Using tokio::task::yield_now can resolve this issue.

Upvotes: 1

cdhowie
cdhowie

Reputation: 169123

Here, you make a blocking synchronous call in an async context:

async fn read_line() -> String {
    loop {
        let mut str = String::new();
        io::stdin().read_line(&mut str).unwrap();
        //          ^^^^^^^^^^^^^^^^^^^
        //          This is sync+blocking

        str = str.trim().to_string();
        if !str.is_empty() {
            return str;
        }
    }
}

You never ever make blocking synchronous calls in an async context, because that prevents the entire thread from running other async tasks. Your channel receiver task is likely also assigned to this thread, so it's having to wait until all the blocking calls are done and whatever invokes this function yields back to the async runtime.

Tokio has its own async version of stdin, which you should use instead.

Upvotes: 4

Related Questions