Reputation: 1697
I'm implementing AsyncRead for a struct, that should decompress data.
I'm using pin project and my problem is, that whenever I call let this = self.project()
, it consumes self
and whenever I call this.reader.poll_read(..)
afterwards, it consumes this.reader
so I can't poll again and I can't project again, but I need to call it multiple times, in order to discover how many bytes need to be read next.
poll_read(..)
doesn't fill at least one byte it means EOF, but in our case we can't fill one byte, but need to read more bytes from the reader.So I though of calling the waker and returning Pending, should run my poll_read(..)
again.
cx.waker().wake_by_ref(); // call me again?
return Poll::Pending;
That actually works, but is it really the correct way?
Sample code from the poll_read(..)
:
#[pin_project]
pub struct AsyncDecoder<R> {
#[pin]
reader: R,
state: DecoderState,
}
impl<R> AsyncRead for AsyncDecoder<R> where R: AsyncRead {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let this = self.project();
// ..
// 1. read file header (4 bytes)
this.reader.poll_read(cx, ..)?;
// ..
// 2. read block header (n bytes)
this.reader.poll_read(cx, ..)?;
// ..
// 3. read compressed data block (n bytes)
this.reader.poll_read(cx, ..)?;
// ..
// repeat 2. + 3. until EOF
}
}
Upvotes: 1
Views: 1000
Reputation: 71525
Using the waker works but adds unnecessary overhead. The way to replicate a Pin<&mut T>
is by using Pin::as_mut()
:
impl<R> AsyncRead for AsyncDecoder<R>
where
R: AsyncRead,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// let this = self.project();
// ..
// 1. read file header (4 bytes)
self.as_mut().project().reader.poll_read(cx, ...)?;
// ..
// 2. read block header (n bytes)
self.as_mut().project().reader.poll_read(cx, ...)?;
// ..
// 3. read compressed data block (n bytes)
self.project().reader.poll_read(cx, ...)?;
// ..
// repeat 2. + 3. until EOF
}
}
Upvotes: 3
Reputation: 26609
Yes, it should be fine.
An async runtime has to support the cx
being woken before a poller returning, because the cx
may have been sent to another thread that will have no idea whether or not the original thread has returned or not.
Upvotes: 1