Reputation: 23
In C#, we can build an awaitable Object with implementing the INotifyCompletion
interface.
public class MyAwaiter<T> : INotifyCompletion
{
public bool IsCompleted { get; private set; }
public T GetResult()
{
throw new NotImplementedException();
}
public void OnCompleted(Action continuation)
{
throw new NotImplementedException();
}
}
But in Rust, I don't know how to build an async function to support operations that are not currently supported in existing asynchronous libraries, such as communicating with low-level devices.
Could you give me an example of self-implementing async function in rust?
Upvotes: 2
Views: 840
Reputation: 466
You'll need to implement the Future
trait on a struct, so let's look at std's definition of Future
, specifically, it's .poll
method:
When a future is not ready yet, poll returns
Poll::Pending
and stores a clone of theWaker
copied from the currentContext
. ThisWaker
is then woken once the future can make progress. For example, a future waiting for a socket to become readable would call.clone()
on theWaker
and store it.
One way to use this with some asynchronous mechanism given by the OS would be to send the cloned Waker
to a newly spawned thread (or ideally, a thread pool where you can enqueue events to wake) that blocks on the event that you set up and calls wake()
when it's done.
In this example I used sleeping on a thread, but by either using Mio as suggest by a commenter or directly IOCP, you can get pretty similar code, the important aspect is just waking the Waker
and notifying the Future
that it happened.
struct MyEvent {
is_ready: Arc<AtomicBool>, // Could use a channel to transfer when the task is ready instead
is_polled: bool, // Prevents multiple events to get enqueued on the same future
}
impl MyEvent {
fn new() -> Self {
MyEvent {
is_ready: Arc::new(AtomicBool::new(false)),
is_polled: false,
}
}
}
impl Future for MyEvent {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.is_ready.load(atomic::Ordering::SeqCst) {
true => Poll::Ready(()),
false => {
if self.is_polled {
Poll::Pending
} else {
let waker = cx.waker().clone();
let channel = Arc::clone(&self.is_ready);
self.get_mut().is_polled = true;
thread::spawn(move || {
// Here you block based on whatever event
thread::sleep(Duration::from_secs(5));
channel.store(true, atomic::Ordering::SeqCst);
waker.wake();
});
Poll::Pending
}
}
}
}
}
EDIT: I just noticed that you need to update the waker whenever a new poll is made (althought this shouldn't happen with most executors, as they should repoll only when the Waker
gets woke). The solution isn't trivial and I'd suggest the reader to check the Futures crate in both its source code and the provided channels (oneshot
) and AtomicWaker
, which should make this much simpler. If an actual implementation that works around this issue is requested, I'll try working on a simple POC.
Upvotes: 1