Reputation: 1771
I'm trying to use scan method from StreamExt. It works perfectly if I have no async block.
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| {
*s += i;
futures::future::ready(Some(*s))
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
But if I do have async
block, it doesn't compile.
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| async move {
*s += i;
Some(*s)
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
Error is:
error: borrowed data cannot be stored outside of its closure
--> src/main.rs:6:36
|
5 | / stream::iter(1..10)
6 | | .scan(0, |s, i| async move {
| |__________________------____________^
| || |
| || ...because it cannot outlive this closure
7 | || *s += i;
8 | || Some(*s)
9 | || })
| ||_________^ cannot be stored outside of its closure
... |
12 | | })
13 | | .await;
| |______________- borrowed data cannot be stored into here...
How can I solve this issue and mutate state inside an async
block?
Upvotes: 2
Views: 700
Reputation: 58695
You can't share references across async
boundaries. As soon as code executes in an async
context, there is no way to track the lifetimes any more because the compiler doesn't know when the future will complete.
One solution is to use a reference counted smart pointer and internal mutability:
use futures::{stream, StreamExt};
use std::cell::Cell;
use std::rc::Rc;
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Rc::new(Cell::new(0)), |s, i| {
let s = s.clone();
async move {
s.set(s.get() + i);
Some(s.get())
}
})
.for_each(|x| async move {
println!("{:?}", x);
})
.await;
}
The s
in the closure argument is an &mut Rc<Cell<i32>>
which cannot be moved across the async
boundary. But the cloned version is an Rc<Cell<i32>>
which can be moved there because there are no lifetimes to keep track of.
Upvotes: 4
Reputation: 76
use futures::{stream, StreamExt};
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(0, |s, i| { // s: &mut i32, i: i32
*s += i;
let st = *s; // Dereference `s`, and move the value of `s` to `st`, but the type of
// `*s` is i32 which impl Copy trait, so here Copy `*s` value to
// the owned `st`.
//
// Note: this works only if `s` impl the Copy trait
async move { // move the owned i32 st into the async block (copy again)
// and also move i into the async block which is copyed too
Some(st)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
If s
does not impl the Copy
trait, you must copy/clone manually by hand:
use futures::{stream, StreamExt};
#[derive(Debug, Clone)]
struct TT {
s: i32,
}
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(TT{s: 0}, |s, i| {
s.s += i;
// let mut st = *s; // move occurs because `*s` has type `TT`, which does not implement the `Copy` trait
let st = s.clone(); // clone/copy manually
async move {
Some(st)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
If you want to change the s
inside the async block, you must own it via a smart pointer, like Rc
, or Arc
. And also need a way to modify the value wrapped by the smart pointer, like Mutex
, RwLock
or Cell
:
use futures::{stream, StreamExt, lock::Mutex};
use std::sync::Arc; // Arc or Rc
#[tokio::main]
async fn main() {
stream::iter(1..10)
.scan(Arc::new(Mutex::new(0)), |s, i| { // s: &mut Arc<Mutex<i32>>, i: i32
let s = Arc::clone(s); // Clone to owned the Arc
async move { // move the owned `s` into the async block
// and also move i into the async block which is copyed too
let mut guard = s.lock().await;
*guard += i;
Some(*guard)
}
})
.for_each(|x| {
println!("{:?}", x);
async {}
})
.await;
}
Upvotes: 1