Reputation: 1846
I have a function, collect_n
, that returns a Future
that repeatedly polls
a futures::sync::mpsc::Receiver
and collects the results into a vector, and resolves with that vector. The problem is that it consumes the Receiver
, so that Receiver
can't be used again. I'm trying to write a version that takes ownership of the Receiver
but then returns ownership back to the caller when the returned Future
resolves.
Here's what I wrote:
/// Like collect_n but returns the mpsc::Receiver it consumes so it can be reused.
pub fn collect_n_reusable<T>(
mut rx: mpsc::Receiver<T>,
n: usize,
) -> impl Future<Item = (mpsc::Receiver<T>, Vec<T>), Error = ()> {
let mut events = Vec::new();
future::poll_fn(move || {
while events.len() < n {
let e = try_ready!(rx.poll()).unwrap();
events.push(e);
}
let ret = mem::replace(&mut events, Vec::new());
Ok(Async::Ready((rx, ret)))
})
}
This results in a compile error:
error[E0507]: cannot move out of `rx`, a captured variable in an `FnMut` closure
--> src/test_util.rs:200:26
|
189 | mut rx: mpsc::Receiver<T>,
| ------ captured outer variable
...
200 | Ok(Async::Ready((rx, ret)))
| ^^ move occurs because `rx` has type `futures::sync::mpsc::Receiver<T>`, which does not implement the `Copy` trait
How can I accomplish what I'm trying to do?
Upvotes: 2
Views: 1654
Reputation: 8793
It is not possible unless your variable is shareable like Rc
or Arc
, since FnMut
can be called multiple times it is possible that your closure needs to return the captured variable more than one. But after returning you lose the ownership of the variable so you cannot return it back, due to safety Rust doesn't let you do this.
According to your logic we know that once your Future
is ready it will not need to be polled again, so we can create a solution without using smart pointer. Lets consider a container object like below, I've used Option
:
use futures::sync::mpsc;
use futures::{Future, Async, try_ready};
use futures::stream::Stream;
use core::mem;
pub fn collect_n_reusable<T>(
mut rx: mpsc::Receiver<T>,
n: usize,
) -> impl Future<Item = (mpsc::Receiver<T>, Vec<T>), Error = ()> {
let mut events = Vec::new();
let mut rx = Some(rx); //wrapped with an Option
futures::future::poll_fn(move || {
while events.len() < n {
let e = try_ready!(rx.as_mut().unwrap().poll()).unwrap();//used over an Option
events.push(e);
}
let ret = mem::replace(&mut events, Vec::new());
//We took it from option and returned.
//Careful this will panic if the return line got execute more than once.
Ok(Async::Ready((rx.take().unwrap(), ret)))
})
.fuse()
}
Basically we captured the container not the receiver. So this made compiler happy. I used fuse()
to make sure the closure will not be called again after returning Async::Ready
, otherwise it would panic for further polls.
The other solution would be using std::mem::swap
:
futures::future::poll_fn(move || {
while events.len() < n {
let e = try_ready!(rx.poll()).unwrap();
events.push(e);
}
let mut place_holder_receiver = futures::sync::mpsc::channel(0).1; //creating object with same type to swap with actual one.
let ret = mem::replace(&mut events, Vec::new());
mem::swap(&mut place_holder_receiver, &mut rx); //Swapping the actual receiver with the placeholder
Ok(Async::Ready((place_holder_receiver, ret))) //so we can return placeholder in here
})
.fuse()
Simply I've swapped actual receiver
with the place_holder_receiver
. Since placeholder is created in FnMut
(Not captured) we can return it as many time as we want, so compiler is happy again. Thanks to fuse()
this closure will not be called after successful return, otherwise it would return the Receiver
s with a dropped Sender
.
See also:
Upvotes: 3