Vana
Vana

Reputation: 933

future created by async block is not `Send`

I do a server update in rust. it create patches between 2 binaries files, and serves static files

I try to do

    let mut update_state;
    if let Some(state) = update_stream.next().await {
        if let Ok(state) = state {
            update_state = state
        } else if let Err(err) = state {
            reply = BuildOutput { error: "Update failed: ".to_string() + &err.to_string() }
        }
    } else {
        reply = BuildOutput { error: "Unreacheable".to_string() }
    }

    let state = update_state.borrow();
    let progress = state.histogram.progress();

    let res = update_stream.try_for_each(|_state| future::ready(Ok(()))).await;

but get

note: future is not `Send` as this value is used across an await
   --> server\grpc\src\rpc.rs:260:50
    |
259 |         let mut update_state;
    |             ---------------- has type `SharedUpdateProgress` which is not `Send`
260 |         if let Some(state) = update_stream.next().await {
    |                                                  ^^^^^^ await occurs here, with `mut update_state` maybe used later
...
305 |     }
    |     - `mut update_state` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send

SharedUpdateProgress:

#[derive(Clone)]
pub struct SharedUpdateProgress {
    state: Rc<RefCell<UpdateProgress>>,
}

impl SharedUpdateProgress {
    pub fn new(target_revision: CleanName) -> Self {
        Self { state: Rc::new(RefCell::new(UpdateProgress::new(target_revision))) }
    }

    pub fn borrow(&self) -> Ref<'_, UpdateProgress> {
        self.state.borrow()
    }

    pub(crate) fn borrow_mut(&self) -> RefMut<'_, UpdateProgress> {
        self.state.borrow_mut()
    }
}

I don't know why and don't know how to fix it

Upvotes: 7

Views: 15190

Answers (1)

Finomnis
Finomnis

Reputation: 22728

I assume a minimal reproducible example of your problem is as follows:

use std::{cell::RefCell, rc::Rc};
use tokio::time::{sleep, Duration};

#[derive(Clone)]
pub struct SharedString {
    state: Rc<RefCell<String>>,
}

impl SharedString {
    pub fn new(initial: &str) -> Self {
        Self {
            state: Rc::new(RefCell::new(initial.into())),
        }
    }
}

async fn run() {
    let shared_string = SharedString::new("Hello,");
    sleep(Duration::from_millis(1)).await;
    *shared_string.state.borrow_mut() += " world!";
    sleep(Duration::from_millis(1)).await;
    println!("{:?}", shared_string.state.borrow());
}

#[tokio::main]
async fn main() {
    tokio::task::spawn(run()).await.unwrap();
}
error: future cannot be sent between threads safely
   --> src/main.rs:27:24
    |
27  |     tokio::task::spawn(run()).await.unwrap();
    |                        ^^^^^ future returned by `run` is not `Send`
    |
    = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<RefCell<String>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:19:36
    |
18  |     let shared_string = SharedString::new("Hello,");
    |         ------------- has type `SharedString` which is not `Send`
19  |     sleep(Duration::from_millis(1)).await;
    |                                    ^^^^^^ await occurs here, with `shared_string` maybe used later
...
23  | }
    | - `shared_string` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/martin/.cargo/git/checkouts/tokio-dd4afa005f1f4b79/686577b/tokio/src/task/spawn.rs:163:21
    |
163 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

The tokio Runtime is usually multi-threaded, meaning that at any .await point your task could get moved from one thread to another. That's why everything that is held across an .await point must be Send. Which Rc<RefCell<>> is explicitely not, because it's a single-threaded reference counter.

Solution: Replace Rc<RefCell<>> with Arc<Mutex<>>, which is the thread-safe equivalent.

use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};

#[derive(Clone)]
pub struct SharedString {
    state: Arc<Mutex<String>>,
}

impl SharedString {
    pub fn new(initial: &str) -> Self {
        Self {
            state: Arc::new(Mutex::new(initial.into())),
        }
    }
}

async fn run() {
    let shared_string = SharedString::new("Hello,");
    sleep(Duration::from_millis(1)).await;
    *shared_string.state.lock().unwrap() += " world!";
    sleep(Duration::from_millis(1)).await;
    println!("{:?}", shared_string.state.lock().unwrap());
}

#[tokio::main]
async fn main() {
    tokio::task::spawn(run()).await.unwrap();
}
"Hello, world!"

Upvotes: 5

Related Questions