Reputation: 535
How can I have some part of my application reading user input and also listening for a shutdown.
According to tokio-docs to do this type of things I should use blocking IO in a spawned task.
For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.
And so I did with something like this
async fn read_input(mut rx: watch::Receiver<&str>) {
let mut line = String::new();
let stdin = io::stdin();
loop {
stdin.lock().read_line(&mut line).expect("Could not read line");
let op = line.trim_right();
if op == "EXIT" {
break;
} else if op == "send" {
// send_stuff();
}
line.clear();
}
}
the thing is, how can I check the receiver channel for a shutdown and break this loop? If I await the code will block.
Am I approaching this with the wrong concept/architecture ?
Upvotes: 1
Views: 2856
Reputation: 6300
Without managing your own thread, there has to be a way to use some non-blocking OS API on stdin and wrap it for tokio (tokio::io::Stdin 1.12 uses a blocking variant).
Otherwise if we follow the advice from the docs and spawn our own thread, this is how it could be done:
fn start_reading_stdin_lines(
sender: tokio::sync::mpsc::Sender<String>,
runtime: tokio::runtime::Handle
) {
std::thread::spawn(move || {
let stdin = std::io::stdin();
let mut line_buf = String::new();
while let Ok(_) = stdin.read_line(&mut line_buf) {
let line = line_buf.trim_end().to_string();
line_buf.clear();
let sender2 = sender.clone();
runtime.spawn(async move {
let result = sender2.send(line).await;
if let Err(error) = result {
println!("start_reading_stdin_lines send error: {:?}", error);
}
});
}
});
}
fn start_activity_until_shutdown(watch_sender: tokio::sync::watch::Sender<bool>) {
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
println!("exiting after a signal...");
let result = watch_sender.send(true);
if let Err(error) = result {
println!("watch_sender send error: {:?}", error);
}
});
}
async fn read_input(
mut line_receiver: tokio::sync::mpsc::Receiver<String>,
mut watch_receiver: tokio::sync::watch::Receiver<bool>
) {
loop {
tokio::select! {
Some(line) = line_receiver.recv() => {
println!("line: {}", line);
// process the input
match line.as_str() {
"exit" => {
println!("exiting manually...");
break;
},
"send" => {
println!("send_stuff");
}
unexpected_line => {
println!("unexpected command: {}", unexpected_line);
}
}
}
Ok(_) = watch_receiver.changed() => {
println!("shutdown");
break;
}
}
}
}
#[tokio::main]
async fn main() {
let (line_sender, line_receiver) = tokio::sync::mpsc::channel(1);
start_reading_stdin_lines(line_sender, tokio::runtime::Handle::current());
let (watch_sender, watch_receiver) = tokio::sync::watch::channel(false);
// this will send a shutdown signal at some point
start_activity_until_shutdown(watch_sender);
read_input(line_receiver, watch_receiver).await;
}
Potential improvements:
select!
.Upvotes: 2