Alpharius
Alpharius

Reputation: 509

Why does tokio::mpsc::Sender close when entering the loop?

I am learning to use Rust and making my first small program with tokio.

I have async functions for sending and receiving messages using tokio::mpsc:

sender

async fn msg_stream(sender : mpsc::Sender<Message>) {
    let is = sender.is_closed();
    println!("it is closed : {}",is);
    loop{
        tokio::time::sleep(Duration::from_secs(1)).await;
        let m = Message::new(get_random_string(),get_random_number());
        println!("message = {:?}",m);
        let is = sender.is_closed();
        println!("it is closed : {}",is);
        if let Err(e) = sender.send(m).await{
            println!("channel was closed,{}",e);
        }
    }
}

receiver

async fn read_stream(mut receiver : mpsc::Receiver<Message>){
    let (_, mut rx) = oneshot::channel::<()>();
    loop{
        tokio::select! {
            _ = tokio::time::timeout(Duration::from_secs(10),(& mut rx)) => {
                return;
            }
            message = receiver.recv() =>{
                println!("was receiver message = {:?} ",message)
            }
        }
    }

}


Now I create channels in main and send them to these functions:

#[tokio::main]
async fn main() {
    let (tx,rx) = mpsc::channel::<Message>(8);
    tokio::join!(msg_stream(tx),read(rx));
}

Now when I start I get an error:

channel was closed,channel closed

Also, checks for channel closeness show that at the beginning of the function the channel was opened , but the first check inside the loop and the channel is already closed.

it is closed : false
message = Message { content: "RIKiew", id: 96 }
it is closed : true

I can't figure out what's wrong. As I understand it, this can happen if rx gets dropped, but I don't see it happening here. I will be glad of help to figure out what is wrong with

Upvotes: 0

Views: 1452

Answers (1)

Finomnis
Finomnis

Reputation: 22611

Here's a minimal reproducible example of your problem:

use std::{
    sync::atomic::{AtomicU32, Ordering},
    time::Duration,
};

use tokio::sync::{mpsc, oneshot};

#[derive(Debug)]
struct Message {
    number: u32,
}
impl Message {
    fn new(number: u32) -> Self {
        Self { number }
    }
}

fn get_random_number() -> u32 {
    static NUM: AtomicU32 = AtomicU32::new(1);
    NUM.fetch_add(1, Ordering::Relaxed)
}

async fn msg_stream(sender: mpsc::Sender<Message>) {
    let is = sender.is_closed();
    println!("it is closed : {}", is);
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        let m = Message::new(get_random_number());
        println!("message = {:?}", m);
        let is = sender.is_closed();
        println!("it is closed : {}", is);
        if let Err(e) = sender.send(m).await {
            println!("channel was closed,{}", e);
        }
    }
}

async fn read_stream(mut receiver: mpsc::Receiver<Message>) {
    let (_, mut rx) = oneshot::channel::<()>();
    loop {
        tokio::select! {
            _ = tokio::time::timeout(Duration::from_secs(10), &mut rx) => {
                println!("STOP RECEIVING");
                return;
            }
            message = receiver.recv() =>{
                println!("was receiver message = {:?} ", message)
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<Message>(8);
    tokio::join!(msg_stream(tx), read_stream(rx));
}

Note the line where I added STOP RECEIVING.

The output is:

it is closed : false
STOP RECEIVING
message = Message { number: 1 }
it is closed : true
channel was closed,channel closed
...

So why does this happen?

Let's focus on this function:

async fn read_stream(mut receiver: mpsc::Receiver<Message>) {
    let (_, mut rx) = oneshot::channel::<()>();
    loop {
        tokio::select! {
            _ = tokio::time::timeout(Duration::from_secs(10), &mut rx) => {
                println!("STOP RECEIVING");
                return;
            }
            message = receiver.recv() =>{
                println!("was receiver message = {:?} ", message)
            }
        }
    }
}

A few explanations of the concepts used in this function:

  • tokio::select! jumps into the code whose future finishes first, and cancels the other branches
  • tokio::time::timeout waits until its given future (here &mut rx) triggers, or until the timeout is over

So the question is: why does it jump into the "STOP_RECEIVING" part? The 10 seconds are definitely not over yet.

That means that &mut rx got triggered. And that is absolutely what happens, because a oneshot receiver can be triggered for two reasons:

  • It receives a value
  • Its sender got dropped

And because you immediately drop the sender (via assigning it to _), the receiver will return immediately.

I'm unsure how to help you except pointing out the program flow, because it isn't clear what you are trying to achieve with the oneshot. If you intend it for cancellation reasons, just keep the sender around and this won't happen. You can achieve this by giving it a name.

The thing you might have stumbled across here is that _ is not a variable name. It is a special keyword for indicating that this variable is to be dropped immediately. _rx would be a proper variable name.

Here is one possible working version:

use std::{
    sync::atomic::{AtomicU32, Ordering},
    time::Duration,
};

use tokio::sync::{mpsc, oneshot};

#[derive(Debug)]
struct Message {
    number: u32,
}
impl Message {
    fn new(number: u32) -> Self {
        Self { number }
    }
}

fn get_random_number() -> u32 {
    static NUM: AtomicU32 = AtomicU32::new(1);
    NUM.fetch_add(1, Ordering::Relaxed)
}

async fn msg_stream(sender: mpsc::Sender<Message>) {
    let is = sender.is_closed();
    println!("it is closed : {}", is);
    loop {
        tokio::time::sleep(Duration::from_secs(1)).await;
        let m = Message::new(get_random_number());
        println!("message = {:?}", m);
        let is = sender.is_closed();
        println!("it is closed : {}", is);
        if let Err(e) = sender.send(m).await {
            println!("channel was closed,{}", e);
        }
    }
}

async fn read_stream(mut receiver: mpsc::Receiver<Message>) {
    let (_tx, mut rx) = oneshot::channel::<()>();
    loop {
        tokio::select! {
            _ = tokio::time::timeout(Duration::from_secs(10), &mut rx) => {
                println!("STOP RECEIVING");
                return;
            }
            message = receiver.recv() =>{
                println!("was receiver message = {:?} ", message)
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<Message>(8);
    tokio::join!(msg_stream(tx), read_stream(rx));
}
it is closed : false
message = Message { number: 1 }
it is closed : false
was receiver message = Some(Message { number: 1 }) 
message = Message { number: 2 }
it is closed : false
was receiver message = Some(Message { number: 2 }) 
message = Message { number: 3 }
it is closed : false
was receiver message = Some(Message { number: 3 }) 
message = Message { number: 4 }
it is closed : false
was receiver message = Some(Message { number: 4 }) 
message = Message { number: 5 }
it is closed : false
was receiver message = Some(Message { number: 5 }) 
...

Upvotes: 4

Related Questions