Reputation: 1771
I have a loop where I do some work and send result with Sender
. The work takes time and I need to retry it in case of failure. It's possible that while I retry it, the receiver has been closed and my retries are going to be a waste of time. Because of this, I need a way to check if Receiver
is available without sending a message.
In an ideal world, I want my code to look like this in pseudocode:
let (tx, rx) = tokio::sync::mpsc::channel(1);
tokio::spawn(async move {
// do som stuff with rx and drop it after some time
rx.recv(...).await;
});
let mut attempts = 0;
loop {
if tx.is_closed() {
break;
}
if let Ok(result) = do_work().await {
attempts = 0;
let _ = tx.send(result).await;
} else {
if attempts >= 10 {
break;
} else {
attempts += 1;
continue;
}
}
};
The problem is that Sender
doesn't have an is_closed
method. It does have pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), ClosedError>>
, but I don't know what Context
is or where can I find it.
When I don't have a value to send, how can I check if the sender is able to send?
Upvotes: 5
Views: 3890
Reputation: 6867
Sender
has a try_send method:
Attempts to immediately send a message on this Sender
This method differs from send by returning immediately if the channel's buffer is full or no receiver is waiting to acquire some data. Compared with send, this function has two failure cases instead of one (one for disconnection, one for a full buffer).
Use it instead of send
and check for the error:
if let Err(TrySendError::Closed(_)) = tx.send(result).await {
break;
}
It is possible to do what you want by using poll_fn
from futures
crate. It adapts a function returning Poll
to return a Future
use futures::future::poll_fn; // 0.3.5
use std::future::Future;
use tokio::sync::mpsc::{channel, error::ClosedError, Sender}; // 0.2.22
use tokio::time::delay_for; // 0.2.22
fn wait_until_ready<'a, T>(
sender: &'a mut Sender<T>,
) -> impl Future<Output = Result<(), ClosedError>> + 'a {
poll_fn(move |cx| sender.poll_ready(cx))
}
#[tokio::main]
async fn main() {
let (mut tx, mut rx) = channel::<i32>(1);
tokio::spawn(async move {
// Receive one value and close the channel;
let val = rx.recv().await;
println!("{:?}", val);
});
wait_until_ready(&mut tx).await.unwrap();
tx.send(123).await.unwrap();
wait_until_ready(&mut tx).await.unwrap();
delay_for(std::time::Duration::from_secs(1)).await;
tx.send(456).await.unwrap(); // 456 likely never printed out,
// despite having a positive readiness response
// and the send "succeeding"
}
Note, however, that in the general case this is susceptible to TOCTOU. Even though Sender
's poll_ready
reserves a slot in the channel for later usage, it is possible that the receiving end is closed between the readiness check and the actual send. I tried to indicate this in the code.
Upvotes: 3
Reputation: 361749
Send a null message that the receiver ignores. It could be anything. For example, if you're sending T
now you could change it to Option<T>
and have the receiver ignore None
s.
Yeah, that will work, although I don't really liked this approach since I need to change communication format.
I wouldn't get hung up on the communication format. This isn't a well-defined network protocol that should be isolated from implementation details; it's an internal communication mechanism between two pieces of your own code.
Upvotes: 1