Reputation: 933
I do a server update in rust. it create patches between 2 binaries files, and serves static files
I try to do
let mut update_state;
if let Some(state) = update_stream.next().await {
if let Ok(state) = state {
update_state = state
} else if let Err(err) = state {
reply = BuildOutput { error: "Update failed: ".to_string() + &err.to_string() }
}
} else {
reply = BuildOutput { error: "Unreacheable".to_string() }
}
let state = update_state.borrow();
let progress = state.histogram.progress();
let res = update_stream.try_for_each(|_state| future::ready(Ok(()))).await;
but get
note: future is not `Send` as this value is used across an await
--> server\grpc\src\rpc.rs:260:50
|
259 | let mut update_state;
| ---------------- has type `SharedUpdateProgress` which is not `Send`
260 | if let Some(state) = update_stream.next().await {
| ^^^^^^ await occurs here, with `mut update_state` maybe used later
...
305 | }
| - `mut update_state` is later dropped here
= note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send
SharedUpdateProgress:
#[derive(Clone)]
pub struct SharedUpdateProgress {
state: Rc<RefCell<UpdateProgress>>,
}
impl SharedUpdateProgress {
pub fn new(target_revision: CleanName) -> Self {
Self { state: Rc::new(RefCell::new(UpdateProgress::new(target_revision))) }
}
pub fn borrow(&self) -> Ref<'_, UpdateProgress> {
self.state.borrow()
}
pub(crate) fn borrow_mut(&self) -> RefMut<'_, UpdateProgress> {
self.state.borrow_mut()
}
}
I don't know why and don't know how to fix it
Upvotes: 7
Views: 15190
Reputation: 22728
I assume a minimal reproducible example of your problem is as follows:
use std::{cell::RefCell, rc::Rc};
use tokio::time::{sleep, Duration};
#[derive(Clone)]
pub struct SharedString {
state: Rc<RefCell<String>>,
}
impl SharedString {
pub fn new(initial: &str) -> Self {
Self {
state: Rc::new(RefCell::new(initial.into())),
}
}
}
async fn run() {
let shared_string = SharedString::new("Hello,");
sleep(Duration::from_millis(1)).await;
*shared_string.state.borrow_mut() += " world!";
sleep(Duration::from_millis(1)).await;
println!("{:?}", shared_string.state.borrow());
}
#[tokio::main]
async fn main() {
tokio::task::spawn(run()).await.unwrap();
}
error: future cannot be sent between threads safely
--> src/main.rs:27:24
|
27 | tokio::task::spawn(run()).await.unwrap();
| ^^^^^ future returned by `run` is not `Send`
|
= help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<RefCell<String>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:19:36
|
18 | let shared_string = SharedString::new("Hello,");
| ------------- has type `SharedString` which is not `Send`
19 | sleep(Duration::from_millis(1)).await;
| ^^^^^^ await occurs here, with `shared_string` maybe used later
...
23 | }
| - `shared_string` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/martin/.cargo/git/checkouts/tokio-dd4afa005f1f4b79/686577b/tokio/src/task/spawn.rs:163:21
|
163 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
The tokio Runtime is usually multi-threaded, meaning that at any .await
point your task could get moved from one thread to another. That's why everything that is held across an .await
point must be Send
. Which Rc<RefCell<>>
is explicitely not, because it's a single-threaded reference counter.
Solution: Replace Rc<RefCell<>>
with Arc<Mutex<>>
, which is the thread-safe equivalent.
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
#[derive(Clone)]
pub struct SharedString {
state: Arc<Mutex<String>>,
}
impl SharedString {
pub fn new(initial: &str) -> Self {
Self {
state: Arc::new(Mutex::new(initial.into())),
}
}
}
async fn run() {
let shared_string = SharedString::new("Hello,");
sleep(Duration::from_millis(1)).await;
*shared_string.state.lock().unwrap() += " world!";
sleep(Duration::from_millis(1)).await;
println!("{:?}", shared_string.state.lock().unwrap());
}
#[tokio::main]
async fn main() {
tokio::task::spawn(run()).await.unwrap();
}
"Hello, world!"
Upvotes: 5