Aitch
Aitch

Reputation: 1697

How do I poll a future twice in a `fn poll_..`?

I'm implementing AsyncRead for a struct, that should decompress data.

I'm using pin project and my problem is, that whenever I call let this = self.project(), it consumes self and whenever I call this.reader.poll_read(..) afterwards, it consumes this.reader so I can't poll again and I can't project again, but I need to call it multiple times, in order to discover how many bytes need to be read next.

  1. just returning with Ok is not possible, because per definition if poll_read(..) doesn't fill at least one byte it means EOF, but in our case we can't fill one byte, but need to read more bytes from the reader.
  2. returning with Pending has to wake the task again, but we actually need to run it again immediately.

So I though of calling the waker and returning Pending, should run my poll_read(..) again.

cx.waker().wake_by_ref(); // call me again?
return Poll::Pending;

That actually works, but is it really the correct way?

Sample code from the poll_read(..):

#[pin_project]
pub struct AsyncDecoder<R> {
    #[pin]
    reader: R,
    state: DecoderState,
}

impl<R> AsyncRead for AsyncDecoder<R> where R: AsyncRead {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {

        let this = self.project();
        // ..

        // 1. read file header (4 bytes)
        this.reader.poll_read(cx, ..)?;
        // ..

        // 2. read block header (n bytes)
        this.reader.poll_read(cx, ..)?;
        // ..

        // 3. read compressed data block (n bytes)
        this.reader.poll_read(cx, ..)?;
        // ..

        // repeat 2. + 3. until EOF
    }
}

Upvotes: 1

Views: 1000

Answers (2)

Chayim Friedman
Chayim Friedman

Reputation: 71525

Using the waker works but adds unnecessary overhead. The way to replicate a Pin<&mut T> is by using Pin::as_mut():

impl<R> AsyncRead for AsyncDecoder<R>
where
    R: AsyncRead,
{
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        // let this = self.project();
        // ..

        // 1. read file header (4 bytes)
        self.as_mut().project().reader.poll_read(cx, ...)?;
        // ..

        // 2. read block header (n bytes)
        self.as_mut().project().reader.poll_read(cx, ...)?;
        // ..

        // 3. read compressed data block (n bytes)
        self.project().reader.poll_read(cx, ...)?;
        // ..

        // repeat 2. + 3. until EOF
    }
}

Upvotes: 3

Colonel Thirty Two
Colonel Thirty Two

Reputation: 26609

Yes, it should be fine.

An async runtime has to support the cx being woken before a poller returning, because the cx may have been sent to another thread that will have no idea whether or not the original thread has returned or not.

Upvotes: 1

Related Questions