opensourcegeek
opensourcegeek

Reputation: 5962

How to cancel an infinite stream from within the stream itself?

I'm trying to cancel an interval (interval_timer) after emptying a queue but not sure what is the right strategy.

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

I tried drop as suggested in gitter but that ended up with an error:

let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();

let timer = interval_timer
    .interval(Duration::from_millis(1000))
    .map_err(|_| {
        println!("Errored out");
    });

let s = timer.for_each(move |_| {
    println!("Woke up");
    if some_vars.len() == 1 {
        drop(interval_timer);
    }

    let item = some_vars.pop().unwrap();

    let f = futures::future::ok(item).map(|x| {
        println!("{:?}", x);
    });
    tokio::spawn(f)
});

tokio::run(s);

The error:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
   |
60 |     let mut interval_timer = tokio_timer::Timer::default();
   |         ------------------ captured outer variable
...
72 |                 drop(interval_timer);
   |                      ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure

Upvotes: 6

Views: 2959

Answers (3)

teknopaul
teknopaul

Reputation: 6772

I created a copy of Tokio's Interval struct, adding a reference to a method of my application to indicate when to interrupt early.

In my case, I want to interrupt the Interval to shutdown.

My Interval poll method looks like this:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
    if self.session.read().unwrap().shutdown {
        return Ok(Async::Ready(Some(Instant::now())));
    }

    // Wait for the delay to be done
    let _ = match self.delay.poll() {

Then you need to keep a handle on the task (call task = futures::task::current() when running inside the timeout task).

At any point you can then call task.notify() to kick the interval into action and hit your break out code, interrupting the Interval early.

Inside Interval there is a Delay struct that can be modified, you could create an Interval that you can interrupt and change the timeout, this way you could interrupt once and then continue.

Upvotes: 0

Shepmaster
Shepmaster

Reputation: 430681

For cases where you want to cancel a stream from outside of the stream, see stream-cancel.


For your specific case, it's easiest to convert your collection into a stream and zip it together with the interval timer. This way, the resulting stream naturally stops when the collection is empty:

use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let some_vars = stream::iter_ok(some_vars.into_iter().rev());
        let combined = timer.zip(some_vars);

        combined.for_each(move |(_, item)| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}

Otherwise, you can stop the stream by using and_then to both remove the value from the collection and control if the stream should continue:

use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11

fn main() {
    tokio::run({
        let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];

        let timer =
            Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));

        let limited = timer.and_then(move |_| {
            if some_vars.len() <= 4 {
                Err(())
            } else {
                some_vars.pop().ok_or(())
            }
        });

        limited.for_each(move |item| {
            eprintln!("Woke up");

            tokio::spawn(future::lazy(move || {
                println!("{:?}", item);
                Ok(())
            }));

            Ok(())
        })
    });
}

Upvotes: 5

wolandr
wolandr

Reputation: 52

tokio_timer::Interval implements futures::Stream, so try to use the take_while method:

let s = timer
    .take_while(|()| 
        future::ok(is_net_completed()))
    .for_each(move |_| {
        println!("Woke up");
        // ...
    })

Upvotes: -2

Related Questions