Reputation: 6856
I am trying to make async
cooperative multitasking executor. A feature that I would like to have is to keep some worker-local state for each task. Each worker is pinned to a thread and each thread is pinned to a core.
My idea is for information about the current worker to be returned as part of the future output (see main
function) in the example:
#![feature(waker_getters)]
use std::{
future::Future,
marker::PhantomData,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
};
type WorkQueue = Mutex<Vec<Waker>>;
/// Info about the current worker. This info should be valid only for
/// the current thead so `WorkerInfo` is absolutely not `Send`.
#[derive(Clone)]
struct WorkerInfo {
id: usize,
non_send: PhantomData<*mut ()>,
}
#[derive(Clone)]
struct WorkerLocal {
/// The worker corresponding to this thread. `WorkerLocal` is not
/// `Send` because the info in `worker` is only valid for the
/// current thread.
worker: WorkerInfo,
queue: Arc<WorkQueue>,
}
impl WorkerLocal {
/// Yield control to the executor so that we can be scheduled
/// later, possibly to another worker. `WorkerLocal` is no longer
/// valid so it should be consumed. The function returns the
/// `WorkerLocal` of the thread where it lands on.
async fn coop_yield(self) -> WorkerLocal {
WorkerJumper { worker: None, queue: self.queue }.await
}
}
/// A future for yielding control to the executor and possibly
/// continuing on a different worker later.
struct WorkerJumper {
worker: Option<WorkerInfo>, // None when asleep
queue: Arc<WorkQueue>,
}
impl Future for WorkerJumper {
type Output = WorkerLocal;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(ref wid) = self.worker {
return Poll::Ready(WorkerLocal {
worker: wid.clone(),
queue: self.queue.clone(),
});
} else {
// Push into the work queue
self.queue.lock().unwrap().push(cx.waker().clone());
Poll::Pending
}
}
}
/// Queue some work for the executor to run. the future passed must be
/// `Send` because it may end up in a different thread.
fn spawn<F: Future<Output = ()> + Send>(f: impl FnOnce(WorkerLocal) -> F) {
todo!()
}
fn main() {
spawn(|t1| async {
// here we are on worker 1
let t2 = t1.coop_yield().await;
// here we may be on worker 2
});
}
But the compiler complains that a Send
async block can't return !Send
values. Why is that? I would expect that a future is send only if a !Send
value crosses an .await
.
Is there a safe way to get around it without making a semantically !Send
type into Send
?
Edit: If I interpret the answers correctly, the (part of) the problem seems to be that WorkerLocal
is crossing an "implicit await" that happens between the function taking ownership of its arguemetns and the contained future actually running.
I have one more tangent that I would like to ask about: I realize that there is no way of having bot WorkerInfo
be !Send
and contain it in a future. So I was thinking, let's take an appro RefCell
approach and delegat the problem to the runtime. Why does replacing main
and spawn
in the following way still not work?
struct UnsafeSend<T>(T);
unsafe impl<T> Send for UnsafeSend<T> {}
/// Queue some work for the executor to run. the future passed must be
/// `Send` because it may end up in a different thread.
fn spawn<F: Future<Output = ()> + Send>(f: impl FnOnce(UnsafeSend<WorkerLocal>) -> F) {
todo!()
}
fn main() {
spawn(|t| async {t.0.coop_yield().async;});
}
Still refuses to build, even though t1
does not cross any implicit or explicit await
s.
Upvotes: 1
Views: 1140
Reputation: 169143
Recall that futures do not do any work unless polled. If you run the code async { println!("foo"); }
but never poll the future this expression produces, nothing is printed to the screen.
Now if we look at your code, we can see that spawn()
requires a function that accepts a WorkerLocal
and returns a future that implements Send
. This is problematic, because the t1.coop_yield()
call hasn't yet happened, so the future must own t1
(it captures it by move since coop_yield()
takes self
) so that it can make this call later, when it is polled.
This means that before it's even polled, it owns a WorkerLocal
which isn't Send
, but the future must be Send
. This is the contradiction the compiler is pointing out.
I can't think of a way you can convey this kind of thread-local information in the kind of API you want given the way futures in Rust work; you would need to modify the signature of Future::poll()
to convey this information.
Upvotes: 1
Reputation: 43842
It's okay for a future to use !Send
values that it temporarily holds as long as those don't cross an await point. But in this case, the future owns a !Send
value, so it can't be Send. In particular, consider the moment at which the expression
async {
let t2 = t1.coop_yield().await;
}
has been evaluated — before the future is polled. (async
blocks (and async fn
s) don't execute any of the code inside them until the first time they're polled.) This is an impl Future
, and it currently owns t1
, which is !Send
, so it must be !Send
itself.
So, one thing we can do is change the code so that it uses t1
immediately without putting it in the future:
spawn(|t1| {
let j = t1.coop_yield();
async {
let t2 = j.await;
}
});
However, that probably doesn't suit your goals. And it doesn't work anyway; WorkerJumper
isn't Send
either, because it contains an Option<WorkerInfo>
which is !Send
.
In general, the problems you're running into seem to be that you want analysis of Send
ness that is based on the states of various values (e.g. the premise that the Option
is always None
when jumping threads). But that's not how the compiler works — a given type is either Send
or not. So you need to make sure that, for each type in your system (including the anonymous types of async blocks), the ones that need to be Send
never contain anything !Send
.
I don't have a solution for your particular design goals, but I hope this explanation helps you understand how to collaborate with the compiler.
Upvotes: 1