osa1
osa1

Reputation: 7088

"blocking annotated I/O must be called from the context of the Tokio runtime" when reading stdin in async task

I'm trying to read from stdin in an async task, spawned with tokio::spawn. The executor is crated as

let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

the main task is then run with executor.task(...), which spawns other tasks with tokio::spawn().

fn main then calls executor.run().unwrap(); to wait for all tasks to finish.

The problem is when I do

let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
...
stdin.read(&mut read_buf).await

I get "blocking annotated I/O must be called from the context of the Tokio runtime" error.

Dependencies:

futures-preview = { version = "0.3.0-alpha.18",  features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"

Full code:

extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;

use std::io::Write;
use std::net::SocketAddr;
use tokio::io::AsyncReadExt;
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpListener;
use tokio_sync::oneshot;

use futures::select;

use futures::future::FutureExt;

#[derive(Debug)]
enum AppErr {
    CantBindAddr(std::io::Error),
    CantAccept(std::io::Error),
}

fn main() {
    let mut executor = tokio::runtime::current_thread::Runtime::new().unwrap();

    executor.spawn(async {
        match server_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    executor.run().unwrap(); // ignores RunError
}

async fn server_task() -> Result<(), AppErr> {
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    let mut listener = TcpListener::bind(&addr).map_err(AppErr::CantBindAddr)?;

    loop {
        print!("Waiting for incoming connection...");
        let _ = std::io::stdout().flush();
        let (socket, _) = listener.accept().await.map_err(AppErr::CantAccept)?;
        println!("{:?} connected.", socket);
        let (read, write) = socket.split();

        let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
        let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();

        tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
        tokio::spawn(handle_outgoing(
            write,
            abort_out_task_rcv,
            abort_in_task_snd,
        ));
    }
}

async fn handle_incoming(
    mut conn: TcpStreamReadHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    println!("handle_incoming");

    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                // TODO match abort_ret {..}
                println!("abort signalled, handle_incoming returning");
                return;
            },
            bytes = conn.read(&mut read_buf).fuse() => {
                match bytes {
                    Err(io_err) => {
                        println!("io error when reading input stream: {:?}", io_err);
                        return;
                    }
                    Ok(bytes) => {
                        println!("read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
                    }
                }
            }
        }
    }
}

async fn handle_outgoing(
    conn: TcpStreamWriteHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    println!("handle_outgoing");

    let mut stdin = tokio::io::stdin();
    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                println!("abort signalled, handle_outgoing returning");
                return;
            }
            input = stdin.read(&mut read_buf).fuse() => {
                match input {
                    Err(io_err) => {
                        println!("io error when reading stdin: {:?}", io_err);
                        return;
                    }
                    Ok(bytes) => {
                        println!("handle_outgoing read {} bytes", bytes);
                        // TODO
                    }
                }
            },
        }
    }
}

Questions:

Thanks

Upvotes: 1

Views: 2095

Answers (1)

&#214;mer Erden
&#214;mer Erden

Reputation: 8803

Tokio stdin blocks enclosing thread from the pool of executor because it is annotated with blocking from tokio-executor. From the reference :

When the blocking function enters, it hands off the responsibility of processing the current work queue to another thread.

Your code is not working because the executor that you have used is multiplexing tasks in a single thread(tokio::runtime::current_thread::Runtime::new()). Thus there will remain no other thread to execute other tasks for executor.

If you properly configure your runtime(thread pool with multiple threads) your code will work fine :

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let mut executor = rt.executor();

    executor.spawn(async {
        match server_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    rt.shutdown_on_idle();
}

See also: How can I stop reading from a tokio::io::lines stream?

Upvotes: 2

Related Questions