Reputation: 5962
I'm trying to cancel an interval (interval_timer
) after emptying a queue but not sure what is the right strategy.
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let interval_timer = tokio_timer::Timer::default();
let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});
let s = timer.for_each(move |_| {
println!("Woke up");
let item = some_vars.pop().unwrap();
let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});
tokio::run(s);
I tried drop
as suggested in gitter but that ended up with an error:
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let mut interval_timer = tokio_timer::Timer::default();
let timer = interval_timer
.interval(Duration::from_millis(1000))
.map_err(|_| {
println!("Errored out");
});
let s = timer.for_each(move |_| {
println!("Woke up");
if some_vars.len() == 1 {
drop(interval_timer);
}
let item = some_vars.pop().unwrap();
let f = futures::future::ok(item).map(|x| {
println!("{:?}", x);
});
tokio::spawn(f)
});
tokio::run(s);
The error:
error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
--> src/main.rs:72:22
|
60 | let mut interval_timer = tokio_timer::Timer::default();
| ------------------ captured outer variable
...
72 | drop(interval_timer);
| ^^^^^^^^^^^^^^ cannot move out of captured outer variable in an `FnMut` closure
Upvotes: 6
Views: 2959
Reputation: 6772
I created a copy of Tokio's Interval
struct, adding a reference to a method of my application to indicate when to interrupt early.
In my case, I want to interrupt the Interval
to shutdown.
My Interval poll method looks like this:
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.session.read().unwrap().shutdown {
return Ok(Async::Ready(Some(Instant::now())));
}
// Wait for the delay to be done
let _ = match self.delay.poll() {
Then you need to keep a handle on the task (call task = futures::task::current()
when running inside the timeout task).
At any point you can then call task.notify()
to kick the interval into action and hit your break out code, interrupting the Interval
early.
Inside Interval
there is a Delay
struct that can be modified, you could create an Interval
that you can interrupt and change the timeout, this way you could interrupt once and then continue.
Upvotes: 0
Reputation: 430681
For cases where you want to cancel a stream from outside of the stream, see stream-cancel.
For your specific case, it's easiest to convert your collection into a stream and zip it together with the interval timer. This way, the resulting stream naturally stops when the collection is empty:
use futures::{future, stream, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11
fn main() {
tokio::run({
let some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
let some_vars = stream::iter_ok(some_vars.into_iter().rev());
let combined = timer.zip(some_vars);
combined.for_each(move |(_, item)| {
eprintln!("Woke up");
tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));
Ok(())
})
});
}
Otherwise, you can stop the stream by using and_then
to both remove the value from the collection and control if the stream should continue:
use futures::{future, Stream}; // 0.1.29
use std::time::Duration;
use tokio; // 0.1.22
use tokio_timer::Interval; // 0.2.11
fn main() {
tokio::run({
let mut some_vars = vec![1, 2, 3, 4, 5, 6, 7, 8];
let timer =
Interval::new_interval(Duration::from_millis(100)).map_err(|e| panic!("Error: {}", e));
let limited = timer.and_then(move |_| {
if some_vars.len() <= 4 {
Err(())
} else {
some_vars.pop().ok_or(())
}
});
limited.for_each(move |item| {
eprintln!("Woke up");
tokio::spawn(future::lazy(move || {
println!("{:?}", item);
Ok(())
}));
Ok(())
})
});
}
Upvotes: 5
Reputation: 52
tokio_timer::Interval
implements futures::Stream
, so try to use the take_while
method:
let s = timer
.take_while(|()|
future::ok(is_net_completed()))
.for_each(move |_| {
println!("Woke up");
// ...
})
Upvotes: -2