locka
locka

Reputation: 6029

How do I schedule a repeating task in Tokio?

I am replacing synchronous socket code written in Rust with the asynchronous equivalent using Tokio. Tokio uses futures for asynchronous activity so tasks are chained together and queued onto an executor to be executed by a thread pool.

The basic pseudocode for what I want to do is like this:

let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let in_buf = vec![0u8; 8192];
    // TODO this should happen continuously until an error happens
    let read_task = tokio::io::read(socket, in_buf).and_then(move |(socket, in_buf, bytes_read)| {
        /* ... Logic I want to happen repeatedly as bytes are read ... */
        Ok(())
    };
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);

This pseudocode would only execute my task once. How do I run it continuously? I want it to execute and then execute again and again etc. I only want it to stop executing if it panics or has an error result code. What's the simplest way of doing that?

Upvotes: 10

Views: 5726

Answers (2)

Nicholas Rishel
Nicholas Rishel

Reputation: 1319

A clean way to accomplish this and not have to fight the type system is to use tokio-codec crate; if you want to interact with the reader as a stream of bytes instead of defining a codec you can use tokio_codec::BytesCodec.

use tokio::codec::Decoder;
use futures::Stream;
...

let tokio::net::listener = TcpListener::bind(&sock_addr).unwrap();
let server_task = listener.incoming().for_each(move |socket| {
    let (_writer, reader) = tokio_codec::BytesCodec::new().framed(socket).split();
    let read_task = reader.for_each(|bytes| {
            /* ... Logic I want to happen repeatedly as bytes are read ... */
    });
    tokio::spawn(read_task);
    Ok(())
}).map_err(|err| {
    error!("Accept error = {:?}", err);
});
tokio::run(server_task);

Upvotes: 0

Joel Dice
Joel Dice

Reputation: 121

Using loop_fn should work:

let read_task =
    futures::future::loop_fn((socket, in_buf, 0), |(socket, in_buf, bytes_read)| {
        if bytes_read > 0 { /* handle bytes */ }

        tokio::io::read(socket, in_buf).map(Loop::Continue)
    });

Upvotes: 1

Related Questions