zoska
zoska

Reputation: 1724

How to implement tokio::io::AsyncRead for reqwest::Response

As an exercise I've decided to try to implement tokio::io::AsyncRead trait for reqwest::Response type. I'm calling it an exercise, because I am well aware of easy transition using tokio_util::io::StreamReader. After spending three days gluing together different solutions to similar problems, this is what I've came up so far:

use std::future::Future;
use std::pin::{pin, Pin};
use std::task::Poll;

use bytes::Bytes;
use reqwest::Response;
use reqwest::Result;

use tokio::io::AsyncRead;
use pin_project::pin_project;


#[pin_project]
struct ResponseStreamPin {
    inner: Response,
    future: Option<Pin<Box<dyn Future<Output = Result<Option<Bytes>>>>>>
}

impl AsyncRead for ResponseStreamPin {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        let s = self.as_mut().project();

        if let Some(f) = s.future {
            match f.as_mut().poll(cx) {
                Poll::Ready(result) => {
                    s.future.take();
                    match result {
                        Ok(maybe_bytes) => {
                            if let Some(bytes) = maybe_bytes {
                                buf.put_slice(&bytes);
                            };
                            return Poll::Ready(Ok(()))
                        },
                        Err(_) => return Poll::from(Err(std::io::Error::last_os_error())),
                    }
                },
                _ => return Poll::Pending,
            }
        }

        let future = Box::pin(s.inner.chunk());
        *s.future = Some(future);
        return self.poll_read(cx, buf);
    }
}

Of course this cannot work, because:

error[E0597]: `self` does not live long enough
  --> client/src/async_client/response_stream.rs:26:17
   |
22 |         mut self: Pin<&mut Self>,
   |         -------- binding `self` declared here
...
26 |         let s = self.as_mut().project();
   |                 ^^^^ borrowed value does not live long enough
...
47 |         *s.future = Some(future);
   |                          ------ cast requires that `self` is borrowed for `'static`
48 |         return self.poll_read(cx, buf);
49 |     }
   |      - `self` dropped here while still borrowed

error: lifetime may not live long enough
  --> client/src/async_client/response_stream.rs:47:26
   |
22 |         mut self: Pin<&mut Self>,
   |                       - let's call the lifetime of this reference `'1`
...
47 |         *s.future = Some(future);
   |                          ^^^^^^ cast requires that `'1` must outlive `'static`

error[E0505]: cannot move out of `self` because it is borrowed
  --> client/src/async_client/response_stream.rs:48:16
   |
22 |         mut self: Pin<&mut Self>,
   |         -------- binding `self` declared here
...
26 |         let s = self.as_mut().project();
   |                 ---- borrow of `self` occurs here
...
47 |         *s.future = Some(future);
   |                          ------ cast requires that `self` is borrowed for `'static`
48 |         return self.poll_read(cx, buf);
   |                ^^^^ move out of `self` occurs here

Some errors have detailed explanations: E0505, E0597.
For more information about an error, try `rustc --explain E0505`.

I'm stumped, what can I do next? I suspect something in pin_project probably would let it work, but I couldn't find any detailed pin_project examples.

Upvotes: 0

Views: 341

Answers (0)

Related Questions