Reputation: 3118
async
predicate, the "easy" wayOne way would be to join_all!()
the Future
s that compute the filters on every item. And then filters synchronously based on those:
let arr = vec![...]
let filters = join_all!(arr.iter().map(|it| async { predicate(it).await })
let filtered = arr.enumerate().filter(|index, item| filters[index]).collect::<Vec<_>>();
However, exploring Rust, there's a cleaner way via futures::stream::iter
iterators:
let filtered = futures::stream::iter(vec![...])
.filter(|item| async { predicate(item).await })
.collect::<Vec<_>>
.await
All good up to now.
What if we want to use a functional API to make the predicate
easily configurable?
In that case, our calls will look like:
let filtered = futures::stream::iter(vec![...])
.filter(by_length(4)) // neat!
.collect::<Vec<_>>
.await
And the predicate:
fn by_length(min_length: usize) -> impl FnMut(&i32) -> Future<Output = bool> {
|n| async { query_length(n).await > min_length }
}
async fn query_length(n: &i32) -> usize {
// pretend we're making a network request to fetch `len`...
// for easy reproducibility's sake this will work here
n.to_string().len()
}
Unfortunately compiler is not happy anymore: it complains the Future needs dyn
keyword. And, after adding dyn
, it complains it's not Sized
, as in this minimal reproduction:
use futures::future::Future;
#[tokio::main]
async fn main() {
let arr = vec![10, 100, 1000];
let filtered = futures::stream::iter(arr.into_iter())
.filter(by_length(3))
.collect::<Vec<_>>()
.await;
println!("{:?}", filtered); // should print [100, 1000]
}
fn by_length(min_length: usize) -> impl FnMut(&i32) -> Future<Output = bool> {
|n| async { query_length(n).await > min_length }
}
// yeah it doesn't need to be async in this case, but let's pretend
async fn query_length(n: &i32) -> usize {
n.to_string().len()
}
The error:
Compiling playground v0.0.1 (/playground)
error[E0277]: the size for values of type `(dyn futures::Future<Output = bool> + 'static)` cannot be known at compilation time
--> src/main.rs:16:9
|
16 | |n| async { query_length(n).await > min_length }
| ^ doesn't have a size known at compile-time
|
= help: the trait `Sized` is not implemented for `(dyn futures::Future<Output = bool> + 'static)`
= note: the return type of a function must have a statically known size
dyn
keyword requested by the compiler here is to make it explicit it will rely automatic dispatch, incurring in additional overhead.
However we only need a single typed case here, for which we should be able to generate inline-able machine code. How can Rust be instructed to do that?Upvotes: 6
Views: 2083
Reputation: 154956
While you can't return an impl Future
from an impl FnMut
, you can return a boxed future, i.e. a dyn Future
which must be boxed because it's in return position. After a bit of borrow checker tetris, we arrive to this:
fn by_length(min_length: usize) -> impl FnMut(&i32) -> Pin<Box<dyn Future<Output = bool>>> {
move |&n| Box::pin(async move { query_length(&n).await > min_length })
}
However we only need a single typed case here, for which we should be able to generate inline-able machine code. How can Rust be instructed to do that?
I don't think that's currently possible, at least not if you're invoking an async fn
like query_length()
. Consider a manually written implementation:
fn by_length(min_length: usize) -> impl FnMut(&i32) -> ByLength {
move |&n| ByLength { n }
}
Now, how do we define ByLength
? It must implement Future
, and its poll()
simply transmits the result of polling query_length(n)
. But the future returned by query_length(n)
could suspend multiple times, so ByLength
must store the future so it can poll it as many times as needed - for example:
struct ByLength {
n: i32,
query_fut: Option<???>,
}
impl Future for ByLength {
type Output = usize;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<usize> {
if self.query_fut.is_none() {
self.query_fut = Some(query_length(self.n));
}
self.query_fut.unwrap().poll(cx)
}
}
But now the problem becomes apparent: there is no type to substitute for ???
because query_length()
is an async function which returns a future of an anonymous type. Making ByLength
generic doesn't work because then we're back at the problem that a closure can't return a generic type that it provides. The signature we'd like would require higher-kinded types:
fn by_length(min_length: usize) -> impl for<T> FnMut(&i32) -> ByLength<T> {
move |&n| ByLength { n }
}
...but if we had that, we could just use query_length()
directly:
fn by_length(min_length: usize) -> impl for<T: Future<Output = usize>> FnMut(&i32) -> T {
move |&n| async move { by_length(&n) }
}
Upvotes: 5