Reputation: 6029
I am replacing synchronous socket code written in Rust with the asynchronous equivalent using Tokio. Tokio uses futures for asynchronous activity so tasks are chained together and queued onto an executor to be executed by a thread pool.
The basic pseudocode for what I want to do is like this:
let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
let in_buf = vec![0u8; 8192];
// TODO this should happen continuously until an error happens
let read_task = tokio::io::read(socket, in_buf).and_then(move |(socket, in_buf, bytes_read)| {
/* ... Logic I want to happen repeatedly as bytes are read ... */
Ok(())
};
tokio::spawn(read_task);
Ok(())
}).map_err(|err| {
error!("Accept error = {:?}", err);
});
tokio::run(server_task);
This pseudocode would only execute my task once. How do I run it continuously? I want it to execute and then execute again and again etc. I only want it to stop executing if it panics or has an error result code. What's the simplest way of doing that?
Upvotes: 10
Views: 5726
Reputation: 1319
A clean way to accomplish this and not have to fight the type system is to use tokio-codec
crate; if you want to interact with the reader as a stream of bytes instead of defining a codec you can use tokio_codec::BytesCodec
.
use tokio::codec::Decoder;
use futures::Stream;
...
let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
let (_writer, reader) = tokio_codec::BytesCodec::new().framed(socket).split();
let read_task = reader.for_each(|bytes| {
/* ... Logic I want to happen repeatedly as bytes are read ... */
});
tokio::spawn(read_task);
Ok(())
}).map_err(|err| {
error!("Accept error = {:?}", err);
});
tokio::run(server_task);
Upvotes: 0