Reputation: 121
I'm experimenting with how to stop asynchronous TCP connections and packet reading using Rust's tokio
. I’ve written a way to stop the loop on CTRL+C or timeout event using channel
and select
, but a move occurs on select
in the loop and it fails to compile.
use std::time::Duration;
use tokio;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (src, dst) = oneshot::channel();
tokio::spawn(async {
tokio::select!{
_ = tokio::signal::ctrl_c() => (),
_ = tokio::time::sleep(Duration::from_secs(10)) => ()
}
});
let _ = connect(dst);
eprintln!("progoram finished");
Ok(())
}
async fn connect(shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
loop {
tokio::select! {
biased;
_ = shutdown => return None,
r = tokio::net::TcpStream::connect("127.0.0.1:80") => {
match r {
Ok(stream) => return Some(stream),
Err(err) => {
eprintln!("ERROR: {:?}", err);
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
}
}
}
error[E0382]: use of moved value: `shutdown`
--> src/main.rs:23:17
|
20 | async fn connect(shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
| -------- move occurs because `shutdown` has type `tokio::sync::oneshot::Receiver<()>`, which does not implement the `Copy` trait
...
23 | _ = shutdown => return None,
| ^^^^^^^^ value moved here, in previous iteration of loop
For more information about this error, try `rustc --explain E0382`.
I made a few changes but could not resolve the error.
_ = &shutdown => return None
→ the trait Future
is not implemented for &tokio::sync::oneshot::Receiver<()>
. yes, that's right._ = shutdown.into_future() => return None
with use std::future::IntoFuture;
→ unstable library feature.How should I write a way to gracefully stop such an asynchronous retry loop from the outside?
Upvotes: 6
Views: 3646
Reputation: 121
Adding &mut
when used in select!
will work without causing a move. This is also mentioned in Resuming an async operation in the select!
tutorial.
use std::time::Duration;
use tokio;
use tokio::sync::oneshot;
#[tokio::main]
async fn main() -> std::io::Result<()> {
let (src, dst) = oneshot::channel();
tokio::spawn(async {
tokio::select!{
_ = tokio::signal::ctrl_c() => (),
_ = tokio::time::sleep(Duration::from_secs(5)) => ()
}
eprintln!("interrupting");
src.send(()).unwrap();
});
let _ = connect(dst).await;
eprintln!("progoram finished");
Ok(())
}
async fn connect(mut shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
loop {
tokio::select! {
biased;
_ = &mut shutdown => return None,
r = tokio::net::TcpStream::connect("127.0.0.1:80") => {
match r {
Ok(stream) => return Some(stream),
Err(err) => {
eprintln!("ERROR: {:?}", err);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
}
}
The above code could be stopped by an interrupt after 5 seconds
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
interrupting
progoram finished
Upvotes: 6