于国瑞
于国瑞

Reputation: 23

How to implement an async function with Windows IOCP in Rust?

In C#, we can build an awaitable Object with implementing the INotifyCompletion interface.

public class MyAwaiter<T> : INotifyCompletion
{
    public bool IsCompleted { get; private set; }
    public T GetResult()
    {
        throw new NotImplementedException();
    }
    public void OnCompleted(Action continuation)
    {
        throw new NotImplementedException();
    }
}

But in Rust, I don't know how to build an async function to support operations that are not currently supported in existing asynchronous libraries, such as communicating with low-level devices.

Could you give me an example of self-implementing async function in rust?

Upvotes: 2

Views: 840

Answers (1)

mgostIH
mgostIH

Reputation: 466

You'll need to implement the Future trait on a struct, so let's look at std's definition of Future, specifically, it's .poll method:

When a future is not ready yet, poll returns Poll::Pending and stores a clone of the Waker copied from the current Context. This Waker is then woken once the future can make progress. For example, a future waiting for a socket to become readable would call .clone() on the Waker and store it.

One way to use this with some asynchronous mechanism given by the OS would be to send the cloned Waker to a newly spawned thread (or ideally, a thread pool where you can enqueue events to wake) that blocks on the event that you set up and calls wake() when it's done.

In this example I used sleeping on a thread, but by either using Mio as suggest by a commenter or directly IOCP, you can get pretty similar code, the important aspect is just waking the Waker and notifying the Future that it happened.

struct MyEvent {
    is_ready: Arc<AtomicBool>, // Could use a channel to transfer when the task is ready instead
    is_polled: bool,           // Prevents multiple events to get enqueued on the same future
}

impl MyEvent {
    fn new() -> Self {
        MyEvent {
            is_ready: Arc::new(AtomicBool::new(false)),
            is_polled: false,
        }
    }
}

impl Future for MyEvent {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        match self.is_ready.load(atomic::Ordering::SeqCst) {
            true => Poll::Ready(()),
            false => {
                if self.is_polled {
                    Poll::Pending
                } else {
                    let waker = cx.waker().clone();
                    let channel = Arc::clone(&self.is_ready);
                    self.get_mut().is_polled = true;
                    thread::spawn(move || {
                        // Here you block based on whatever event
                        thread::sleep(Duration::from_secs(5));
                        channel.store(true, atomic::Ordering::SeqCst);
                        waker.wake();
                    });
                    Poll::Pending
                }
            }
        }
    }
}

EDIT: I just noticed that you need to update the waker whenever a new poll is made (althought this shouldn't happen with most executors, as they should repoll only when the Waker gets woke). The solution isn't trivial and I'd suggest the reader to check the Futures crate in both its source code and the provided channels (oneshot) and AtomicWaker, which should make this much simpler. If an actual implementation that works around this issue is requested, I'll try working on a simple POC.

Upvotes: 1

Related Questions