ludeed
ludeed

Reputation: 535

handling user input

How can I have some part of my application reading user input and also listening for a shutdown.

According to tokio-docs to do this type of things I should use blocking IO in a spawned task.

For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.

And so I did with something like this

async fn read_input(mut rx: watch::Receiver<&str>) {
    let mut line = String::new();
    let stdin = io::stdin();

    loop {
        stdin.lock().read_line(&mut line).expect("Could not read line");

        let op = line.trim_right();
        if op == "EXIT" {
            break;
        } else if op == "send" {
            // send_stuff();
        }
        line.clear();

    }
}

the thing is, how can I check the receiver channel for a shutdown and break this loop? If I await the code will block.

Am I approaching this with the wrong concept/architecture ?

Upvotes: 1

Views: 2856

Answers (1)

battlmonstr
battlmonstr

Reputation: 6300

Without managing your own thread, there has to be a way to use some non-blocking OS API on stdin and wrap it for tokio (tokio::io::Stdin 1.12 uses a blocking variant).

Otherwise if we follow the advice from the docs and spawn our own thread, this is how it could be done:

fn start_reading_stdin_lines(
    sender: tokio::sync::mpsc::Sender<String>,
    runtime: tokio::runtime::Handle
) {
    std::thread::spawn(move || {
        let stdin = std::io::stdin();
        let mut line_buf = String::new();
        while let Ok(_) = stdin.read_line(&mut line_buf) {
            let line = line_buf.trim_end().to_string();
            line_buf.clear();
            let sender2 = sender.clone();

            runtime.spawn(async move {
                let result = sender2.send(line).await;
                if let Err(error) = result {
                    println!("start_reading_stdin_lines send error: {:?}", error);
                }
            });
        }
    });
}

fn start_activity_until_shutdown(watch_sender: tokio::sync::watch::Sender<bool>) {
    tokio::spawn(async move {
        tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
        println!("exiting after a signal...");
        let result = watch_sender.send(true);
        if let Err(error) = result {
            println!("watch_sender send error: {:?}", error);
        }
    });
}

async fn read_input(
    mut line_receiver: tokio::sync::mpsc::Receiver<String>,
    mut watch_receiver: tokio::sync::watch::Receiver<bool>
) {
    loop {
        tokio::select! {
            Some(line) = line_receiver.recv() => {
                println!("line: {}", line);
                // process the input
                match line.as_str() {
                    "exit" => {
                        println!("exiting manually...");
                        break;
                    },
                    "send" => {
                        println!("send_stuff");
                    }
                    unexpected_line => {
                        println!("unexpected command: {}", unexpected_line);
                    }
                }
            }
            Ok(_) = watch_receiver.changed() => {
                println!("shutdown");
                break;
            }
        }
    }
}

#[tokio::main]
async fn main() {
    let (line_sender, line_receiver) = tokio::sync::mpsc::channel(1);
    start_reading_stdin_lines(line_sender, tokio::runtime::Handle::current());

    let (watch_sender, watch_receiver) = tokio::sync::watch::channel(false);
    // this will send a shutdown signal at some point
    start_activity_until_shutdown(watch_sender);

    read_input(line_receiver, watch_receiver).await;
}

Potential improvements:

  • if you are ok with tokio_stream wrappers, this could be combined more elegantly with start_reading_stdin_lines producing a stream of lines and mapping them to typed commands. read_input could be then based on StreamMap instead of select!.
  • enabling experimental stdin_forwarders feature makes reading lines easier with a for loop over lines()

Upvotes: 2

Related Questions