Artem Malinko
Artem Malinko

Reputation: 1771

How can I mutate state inside async block with StreamExt::scan method in Rust?

I'm trying to use scan method from StreamExt. It works perfectly if I have no async block.

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0, |s, i| {
            *s += i;
            futures::future::ready(Some(*s))
        })
        .for_each(|x| async move {
            println!("{:?}", x);
        })
        .await;
}

But if I do have async block, it doesn't compile.

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0, |s, i| async move {
            *s += i;
            Some(*s)
        })
        .for_each(|x| async move {
            println!("{:?}", x);
        })
        .await;
}

Error is:

error: borrowed data cannot be stored outside of its closure
  --> src/main.rs:6:36
   |
5  |  /     stream::iter(1..10)
6  |  |         .scan(0, |s, i| async move {
   |  |__________________------____________^
   | ||                  |
   | ||                  ...because it cannot outlive this closure
7  | ||             *s += i;
8  | ||             Some(*s)
9  | ||         })
   | ||_________^ cannot be stored outside of its closure
...   |
12 |  |         })
13 |  |         .await;
   |  |______________- borrowed data cannot be stored into here...

How can I solve this issue and mutate state inside an async block?

Upvotes: 2

Views: 700

Answers (2)

Peter Hall
Peter Hall

Reputation: 58695

You can't share references across async boundaries. As soon as code executes in an async context, there is no way to track the lifetimes any more because the compiler doesn't know when the future will complete.

One solution is to use a reference counted smart pointer and internal mutability:

use futures::{stream, StreamExt};
use std::cell::Cell;
use std::rc::Rc;

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(Rc::new(Cell::new(0)), |s, i| {
            let s = s.clone();
            async move {
                s.set(s.get() + i);
                Some(s.get())
            }
        })
        .for_each(|x| async move {
            println!("{:?}", x);
        })
        .await;
}

The s in the closure argument is an &mut Rc<Cell<i32>> which cannot be moved across the async boundary. But the cloned version is an Rc<Cell<i32>> which can be moved there because there are no lifetimes to keep track of.

Upvotes: 4

belltoy
belltoy

Reputation: 76

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(0, |s, i| {       // s: &mut i32, i: i32
            *s += i;
            let st = *s;        // Dereference `s`, and move the value of `s` to `st`, but the type of
                                // `*s` is i32 which impl Copy trait, so here Copy `*s` value to
                                // the owned `st`.
                                // 
                                // Note: this works only if `s` impl the Copy trait

            async move {        // move the owned i32 st into the async block (copy again)
                                // and also move i into the async block which is copyed too
                Some(st)
            }
        })
        .for_each(|x| {
            println!("{:?}", x);
            async {}
        })
        .await;
}

If s does not impl the Copy trait, you must copy/clone manually by hand:

use futures::{stream, StreamExt};

#[derive(Debug, Clone)]
struct TT {
    s: i32,
}

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(TT{s: 0}, |s, i|  {
            s.s += i;
            // let mut st = *s;      // move occurs because `*s` has type `TT`, which does not implement the `Copy` trait
            let st = s.clone();      // clone/copy manually
            async move {
                Some(st)
            }
        })
        .for_each(|x| {
            println!("{:?}", x);
            async {}
        })
        .await;
}

If you want to change the s inside the async block, you must own it via a smart pointer, like Rc, or Arc. And also need a way to modify the value wrapped by the smart pointer, like Mutex, RwLock or Cell:

use futures::{stream, StreamExt, lock::Mutex};
use std::sync::Arc;  // Arc or Rc

#[tokio::main]
async fn main() {
    stream::iter(1..10)
        .scan(Arc::new(Mutex::new(0)), |s, i| { // s: &mut Arc<Mutex<i32>>, i: i32
            let s = Arc::clone(s);              // Clone to owned the Arc

            async move {        // move the owned `s` into the async block
                                // and also move i into the async block which is copyed too
                let mut guard = s.lock().await;
                *guard += i;
                Some(*guard)
            }
        })
        .for_each(|x| {
            println!("{:?}", x);
            async {}
        })
        .await;
}

Upvotes: 1

Related Questions