porton
porton

Reputation: 5805

How do I run an asynchronous task periodically and also sometimes on demand?

I have a task (downloading something from the Web) that runs regularly with pauses 10 min between runs.

If my program notices that the data is outdated, then it should run the download task immediately unless it is already running. If the download task happened out-of-time, the next task should be after 10 min since the out-of-time task so all future tasks and pauses are shifted later in time.

How do I do this with Tokio?

I made a library to run a sequence of tasks, but trying to use it for my problem failed.

mod tasks_with_regular_pauses;

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::spawn;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_interruptible_future::{
    interruptible, interruptible_sendable, interruptible_straight, InterruptError,
};

pub type TaskItem = Pin<Box<dyn Future<Output = ()> + Send>>;

/// Execute futures from a stream of futures in order in a Tokio task. Not tested code.
pub struct TaskQueue {
    tx: Sender<TaskItem>,
    rx: Arc<Mutex<Receiver<TaskItem>>>,
}

impl TaskQueue {
    pub fn new() -> Self {
        let (tx, rx) = channel(1);
        Self {
            tx,
            rx: Arc::new(Mutex::new(rx)),
        }
    }
    async fn _task(this: Arc<Mutex<Self>>) {
        // let mut rx = ReceiverStream::new(rx);
        loop {
            let this2 = this.clone();
            let fut = {
                // block to shorten locks lifetime
                let obj = this2.lock().await;
                let rx = obj.rx.clone();
                let mut rx = rx.lock().await;
                rx.recv().await
            };
            if let Some(fut) = fut {
                fut.await;
            } else {
                break;
            }
        }
    }
    pub fn spawn(
        this: Arc<Mutex<Self>>,
        notify_interrupt: async_channel::Receiver<()>,
    ) -> JoinHandle<Result<(), InterruptError>> {
        spawn(interruptible_straight(notify_interrupt, async move {
            Self::_task(this).await;
            Ok(())
        }))
    }
    pub async fn push_task(&self, fut: TaskItem) {
        let _ = self.tx.send(fut).await;
    }
}

Upvotes: 3

Views: 2089

Answers (1)

battlmonstr
battlmonstr

Reputation: 6300

I'd recommend using select! instead of interruptible futures to detect one of 3 conditions in your loop:

  • download task is finished
  • the data is outdated signal
  • data expired timeout signal

"The data is outdated" signal can be conveyed using a dedicated channel.

select! allows waiting for futures (like downloading and timeouts), and reading from channels at the same time. See the tutorial for examples of that.

Solution sketch:

loop {
    // it is time to download
    let download_future = ...; // make your URL request
    let download_result = download_future.await;

    // if the outdated signal is generated while download
    // was in progress, ignore the signal by draining the receiver
    while outdated_data_signal_receiver.try_recv().is_ok() {}

    // send results upstream for processing
    download_results_sender.send(download_result); 

    // wait to re-download
    select! {
        // after a 10 min pause
        _ = sleep(Duration::from_minutes(10)) => break,
        // or by an external signal
        _ = outdated_data_signal_receiver.recv() => break,
    }
}

This logic can be simplified further by the timeout primitive:

loop {
    // it is time to download
    let download_future = ...; // make your URL request
    let download_result = download_future.await;

    // if the outdated signal is generated while download
    // was in progress, ignore the signal by draining the receiver
    while outdated_data_signal_receiver.try_recv().is_ok() {}

    // send results upstream for processing
    download_results_sender.send(download_result);

    // re-download by a signal, or timeout (whichever comes first)
    _ = timeout(Duration::from_minutes(10), outdated_data_signal_receiver.recv()).await;
}

Upvotes: 3

Related Questions