Reputation: 509
I am learning to use Rust and making my first small program with tokio.
I have async functions for sending and receiving messages using tokio::mpsc
:
sender
async fn msg_stream(sender : mpsc::Sender<Message>) {
let is = sender.is_closed();
println!("it is closed : {}",is);
loop{
tokio::time::sleep(Duration::from_secs(1)).await;
let m = Message::new(get_random_string(),get_random_number());
println!("message = {:?}",m);
let is = sender.is_closed();
println!("it is closed : {}",is);
if let Err(e) = sender.send(m).await{
println!("channel was closed,{}",e);
}
}
}
receiver
async fn read_stream(mut receiver : mpsc::Receiver<Message>){
let (_, mut rx) = oneshot::channel::<()>();
loop{
tokio::select! {
_ = tokio::time::timeout(Duration::from_secs(10),(& mut rx)) => {
return;
}
message = receiver.recv() =>{
println!("was receiver message = {:?} ",message)
}
}
}
}
Now I create channels in main and send them to these functions:
#[tokio::main]
async fn main() {
let (tx,rx) = mpsc::channel::<Message>(8);
tokio::join!(msg_stream(tx),read(rx));
}
Now when I start I get an error:
channel was closed,channel closed
Also, checks for channel closeness show that at the beginning of the function the channel was opened , but the first check inside the loop and the channel is already closed.
it is closed : false
message = Message { content: "RIKiew", id: 96 }
it is closed : true
I can't figure out what's wrong. As I understand it, this can happen if rx
gets dropped, but I don't see it happening here. I will be glad of help to figure out what is wrong with
Upvotes: 0
Views: 1452
Reputation: 22611
Here's a minimal reproducible example of your problem:
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
struct Message {
number: u32,
}
impl Message {
fn new(number: u32) -> Self {
Self { number }
}
}
fn get_random_number() -> u32 {
static NUM: AtomicU32 = AtomicU32::new(1);
NUM.fetch_add(1, Ordering::Relaxed)
}
async fn msg_stream(sender: mpsc::Sender<Message>) {
let is = sender.is_closed();
println!("it is closed : {}", is);
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let m = Message::new(get_random_number());
println!("message = {:?}", m);
let is = sender.is_closed();
println!("it is closed : {}", is);
if let Err(e) = sender.send(m).await {
println!("channel was closed,{}", e);
}
}
}
async fn read_stream(mut receiver: mpsc::Receiver<Message>) {
let (_, mut rx) = oneshot::channel::<()>();
loop {
tokio::select! {
_ = tokio::time::timeout(Duration::from_secs(10), &mut rx) => {
println!("STOP RECEIVING");
return;
}
message = receiver.recv() =>{
println!("was receiver message = {:?} ", message)
}
}
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<Message>(8);
tokio::join!(msg_stream(tx), read_stream(rx));
}
Note the line where I added STOP RECEIVING
.
The output is:
it is closed : false
STOP RECEIVING
message = Message { number: 1 }
it is closed : true
channel was closed,channel closed
...
So why does this happen?
Let's focus on this function:
async fn read_stream(mut receiver: mpsc::Receiver<Message>) {
let (_, mut rx) = oneshot::channel::<()>();
loop {
tokio::select! {
_ = tokio::time::timeout(Duration::from_secs(10), &mut rx) => {
println!("STOP RECEIVING");
return;
}
message = receiver.recv() =>{
println!("was receiver message = {:?} ", message)
}
}
}
}
A few explanations of the concepts used in this function:
tokio::select!
jumps into the code whose future finishes first, and cancels the other branchestokio::time::timeout
waits until its given future (here &mut rx
) triggers, or until the timeout is overSo the question is: why does it jump into the "STOP_RECEIVING"
part? The 10 seconds are definitely not over yet.
That means that &mut rx
got triggered. And that is absolutely what happens, because a oneshot receiver can be triggered for two reasons:
And because you immediately drop the sender (via assigning it to _
), the receiver will return immediately.
I'm unsure how to help you except pointing out the program flow, because it isn't clear what you are trying to achieve with the oneshot. If you intend it for cancellation reasons, just keep the sender around and this won't happen. You can achieve this by giving it a name.
The thing you might have stumbled across here is that _
is not a variable name. It is a special keyword for indicating that this variable is to be dropped immediately. _rx
would be a proper variable name.
Here is one possible working version:
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
#[derive(Debug)]
struct Message {
number: u32,
}
impl Message {
fn new(number: u32) -> Self {
Self { number }
}
}
fn get_random_number() -> u32 {
static NUM: AtomicU32 = AtomicU32::new(1);
NUM.fetch_add(1, Ordering::Relaxed)
}
async fn msg_stream(sender: mpsc::Sender<Message>) {
let is = sender.is_closed();
println!("it is closed : {}", is);
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let m = Message::new(get_random_number());
println!("message = {:?}", m);
let is = sender.is_closed();
println!("it is closed : {}", is);
if let Err(e) = sender.send(m).await {
println!("channel was closed,{}", e);
}
}
}
async fn read_stream(mut receiver: mpsc::Receiver<Message>) {
let (_tx, mut rx) = oneshot::channel::<()>();
loop {
tokio::select! {
_ = tokio::time::timeout(Duration::from_secs(10), &mut rx) => {
println!("STOP RECEIVING");
return;
}
message = receiver.recv() =>{
println!("was receiver message = {:?} ", message)
}
}
}
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<Message>(8);
tokio::join!(msg_stream(tx), read_stream(rx));
}
it is closed : false
message = Message { number: 1 }
it is closed : false
was receiver message = Some(Message { number: 1 })
message = Message { number: 2 }
it is closed : false
was receiver message = Some(Message { number: 2 })
message = Message { number: 3 }
it is closed : false
was receiver message = Some(Message { number: 3 })
message = Message { number: 4 }
it is closed : false
was receiver message = Some(Message { number: 4 })
message = Message { number: 5 }
it is closed : false
was receiver message = Some(Message { number: 5 })
...
Upvotes: 4