Ervadac
Ervadac

Reputation: 956

AsyncRead for UdpSocket

I'm trying to implement AsyncRead for a UdpSocket that have an async recv function and I have some difficulties calling poll on my future:

use async_std::{
    io::Read as AsyncRead,
    net::UdpSocket,
};

struct MyUdpWrapper {
    socket: Arc<UdpSocket>,
    fut: Option<Box<dyn Future<Output = Result<usize>>>>,
}

impl AsyncRead for MyUdpWrapper {                                                                                                                                                                                                         
    fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
        let this = Pin::into_inner(self);
        let fut = this.fut.unwrap_or(Box::new(this.socket.recv(buf)));
        let fut = unsafe { Pin::new_unchecked(&mut *fut) };
        fut.poll(cx)
    }
}

I was thinking storing the future in an option so it continues to live after the 1st poll but I'm not clear about how to do that exactly. The async recv() returns a Result so I guess I should store an Option<Box<dyn Future<Output=Result<size>>>> but that seems a bit cumbersome?

Is there a good way to call an async function in poll_read (or poll_* in general)?

Upvotes: 1

Views: 512

Answers (1)

Coder-256
Coder-256

Reputation: 5618

Your code is actually unsound since UdpSocket::recv() borrows the provided buffer, so you can't release the buffer until the future is dropped, but the buffer given to poll_read() is only temporary. I also think you want something more like this.fut.get_or_insert_with() instead of this.fut.unwrap_or().

Under the hood, async_std::net::UdpSocket::recv() actually uses async_io::Async::<std::net::UdpSocket>::recv(), which calls the read_with() method.

We need to use async_io::Async<std::net::UdpSocket> directly. Async<T> already implements AsyncRead when T: Read. This is not the case for UdpSocket, but we can write a similar implementation:

use std::{
    future::Future,
    net::UdpSocket,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use async_io::Async;
use async_std::{io::Read as AsyncRead, io::Result};

struct MyUdpWrapper {
    socket: Arc<Async<UdpSocket>>,
}

impl AsyncRead for MyUdpWrapper {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut [u8],
    ) -> Poll<Result<usize>> {
        loop {
            match self.socket.get_ref().recv(buf) {
                // Calls std::net::UdpSocket::recv()
                Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {}
                res => return Poll::Ready(res),
            }
            match Pin::new(&mut self.socket.readable()).poll(cx) {
                Poll::Ready(res) => res?,
                Poll::Pending => return Poll::Pending,
            }
        }
    }
}

(Note: it's a bit unclear to me why async_io::Async::poll_read() uses poll_readable() but async_std::net::UdpSocket::recv() uses readable(); it seems that only the latter registers the current task for wakeup, so that's what I chose here).

Upvotes: 2

Related Questions