doplumi
doplumi

Reputation: 3118

How do I use an higher order async function to filter a Vec?

Filtering via async predicate, the "easy" way

One way would be to join_all!() the Futures that compute the filters on every item. And then filters synchronously based on those:

let arr = vec![...]
let filters = join_all!(arr.iter().map(|it| async { predicate(it).await })
let filtered = arr.enumerate().filter(|index, item| filters[index]).collect::<Vec<_>>();

However, exploring Rust, there's a cleaner way via futures::stream::iter iterators:

let filtered = futures::stream::iter(vec![...])
  .filter(|item| async { predicate(item).await })
  .collect::<Vec<_>>
  .await

All good up to now.


Configurable filter: trouble begins

What if we want to use a functional API to make the predicate easily configurable?

In that case, our calls will look like:

let filtered = futures::stream::iter(vec![...])
  .filter(by_length(4)) // neat!
  .collect::<Vec<_>>
  .await

And the predicate:

fn by_length(min_length: usize) -> impl FnMut(&i32) -> Future<Output = bool> {
    |n| async { query_length(n).await > min_length }
}

async fn query_length(n: &i32) -> usize {
    // pretend we're making a network request to fetch `len`...
    // for easy reproducibility's sake this will work here
    n.to_string().len()
}

Unfortunately compiler is not happy anymore: it complains the Future needs dyn keyword. And, after adding dyn, it complains it's not Sized, as in this minimal reproduction:

use futures::future::Future;

#[tokio::main]
async fn main() {
    let arr = vec![10, 100, 1000];
    
    let filtered = futures::stream::iter(arr.into_iter())
      .filter(by_length(3))
      .collect::<Vec<_>>()
      .await;
      
    println!("{:?}", filtered); // should print [100, 1000]
}

fn by_length(min_length: usize) -> impl FnMut(&i32) -> Future<Output = bool> {
    |n| async { query_length(n).await > min_length }
}

// yeah it doesn't need to be async in this case, but let's pretend
async fn query_length(n: &i32) -> usize {
    n.to_string().len()
}

The error:

   Compiling playground v0.0.1 (/playground)
error[E0277]: the size for values of type `(dyn futures::Future<Output = bool> + 'static)` cannot be known at compilation time
  --> src/main.rs:16:9
   |
16 |     |n| async { query_length(n).await > min_length }
   |         ^ doesn't have a size known at compile-time
   |
   = help: the trait `Sized` is not implemented for `(dyn futures::Future<Output = bool> + 'static)`
   = note: the return type of a function must have a statically known size

Questions

Upvotes: 6

Views: 2083

Answers (1)

user4815162342
user4815162342

Reputation: 154956

While you can't return an impl Future from an impl FnMut, you can return a boxed future, i.e. a dyn Future which must be boxed because it's in return position. After a bit of borrow checker tetris, we arrive to this:

fn by_length(min_length: usize) -> impl FnMut(&i32) -> Pin<Box<dyn Future<Output = bool>>> {
    move |&n| Box::pin(async move { query_length(&n).await > min_length })
}

Playground


However we only need a single typed case here, for which we should be able to generate inline-able machine code. How can Rust be instructed to do that?

I don't think that's currently possible, at least not if you're invoking an async fn like query_length(). Consider a manually written implementation:

fn by_length(min_length: usize) -> impl FnMut(&i32) -> ByLength {
    move |&n| ByLength { n }
}

Now, how do we define ByLength? It must implement Future, and its poll() simply transmits the result of polling query_length(n). But the future returned by query_length(n) could suspend multiple times, so ByLength must store the future so it can poll it as many times as needed - for example:

struct ByLength {
    n: i32,
    query_fut: Option<???>,
}

impl Future for ByLength {
    type Output = usize;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<usize> {
        if self.query_fut.is_none() {
            self.query_fut = Some(query_length(self.n));
        }
        self.query_fut.unwrap().poll(cx)
    }
}

But now the problem becomes apparent: there is no type to substitute for ??? because query_length() is an async function which returns a future of an anonymous type. Making ByLength generic doesn't work because then we're back at the problem that a closure can't return a generic type that it provides. The signature we'd like would require higher-kinded types:

fn by_length(min_length: usize) -> impl for<T> FnMut(&i32) -> ByLength<T> {
    move |&n| ByLength { n }
}

...but if we had that, we could just use query_length() directly:

fn by_length(min_length: usize) -> impl for<T: Future<Output = usize>> FnMut(&i32) -> T {
    move |&n| async move { by_length(&n) }
}

Upvotes: 5

Related Questions