oblivionist
oblivionist

Reputation: 121

How to await a Future in tokio::select! without moving it?

I'm experimenting with how to stop asynchronous TCP connections and packet reading using Rust's tokio. I’ve written a way to stop the loop on CTRL+C or timeout event using channel and select, but a move occurs on select in the loop and it fails to compile.

use std::time::Duration;
use tokio;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (src, dst) = oneshot::channel();
    tokio::spawn(async {
        tokio::select!{
            _ = tokio::signal::ctrl_c() => (),
            _ = tokio::time::sleep(Duration::from_secs(10)) => ()
        }
    });

    let _ = connect(dst);
    eprintln!("progoram finished");
    Ok(())
}

async fn connect(shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
    loop {
        tokio::select! {
            biased;
            _ = shutdown => return None,
            r = tokio::net::TcpStream::connect("127.0.0.1:80") => {
                match r {
                    Ok(stream) => return Some(stream),
                    Err(err) => {
                        eprintln!("ERROR: {:?}", err);
                        tokio::time::sleep(Duration::from_secs(2)).await;
                    }
                }
            }
        }
    }
}
error[E0382]: use of moved value: `shutdown`
  --> src/main.rs:23:17
   |
20 | async fn connect(shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
   |                  -------- move occurs because `shutdown` has type `tokio::sync::oneshot::Receiver<()>`, which does not implement the `Copy` trait
...
23 |             _ = shutdown => return None,
   |                 ^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.

I made a few changes but could not resolve the error.

How should I write a way to gracefully stop such an asynchronous retry loop from the outside?

Upvotes: 6

Views: 3646

Answers (1)

oblivionist
oblivionist

Reputation: 121

Adding &mut when used in select! will work without causing a move. This is also mentioned in Resuming an async operation in the select! tutorial.

use std::time::Duration;
use tokio;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> std::io::Result<()> {
    let (src, dst) = oneshot::channel();
    tokio::spawn(async {
        tokio::select!{
            _ = tokio::signal::ctrl_c() => (),
            _ = tokio::time::sleep(Duration::from_secs(5)) => ()
        }
        eprintln!("interrupting");
        src.send(()).unwrap();
    });

    let _ = connect(dst).await;
    eprintln!("progoram finished");
    Ok(())
}

async fn connect(mut shutdown: oneshot::Receiver<()>) -> Option<tokio::net::TcpStream> {
    loop {
        tokio::select! {
            biased;
            _ = &mut shutdown => return None,
            r = tokio::net::TcpStream::connect("127.0.0.1:80") => {
                match r {
                    Ok(stream) => return Some(stream),
                    Err(err) => {
                        eprintln!("ERROR: {:?}", err);
                        tokio::time::sleep(Duration::from_secs(1)).await;
                    }
                }
            }
        }
    }
}

The above code could be stopped by an interrupt after 5 seconds

ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
ERROR: Os { code: 111, kind: ConnectionRefused, message: "Connection refused" }
interrupting
progoram finished

Upvotes: 6

Related Questions