molf
molf

Reputation: 75025

How to copy data from a stream while also forwarding a stream

I am using hyper 0.12 to build a proxy service. When receiving a response body from the upstream server I want to forward it back to the client ASAP, and save the contents in a buffer for later processing.

So I need a function that:

I can't for the life of me figure out how to do this.

I guess the function I'm looking for will look something like this:

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let body2 = ... // ???
    let buffer = body.fold(Vec::<u8>::new(), |mut buf, chunk| {
        buf.extend_from_slice(&chunk);
        // ...somehow send this chunk to body2 also?
    });
    (body2, buffer);
}

Below is what I have tried, and it works until send_data() fails (obviously).

type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()>>;
pub fn copy_body(body: hyper::Body) -> (hyper::Body, BufferFuture) {
    let (mut sender, body2) = hyper::Body::channel();
    let consume =
        body.map_err(|_| ()).fold(Vec::<u8>::new(), move |mut buf, chunk| {
            buf.extend_from_slice(&chunk);

            // What to do if this fails?
            if sender.send_data(chunk).is_err() {}
            Box::new(future::ok(buf))
        });

    (body2, Box::new(consume));
}

However, something tells me I am on the wrong track.

I have found Sink.fanout() which seems like it is what I want, but I do not have a Sink, and I don't know how to construct one. hyper::Body implements Stream but not Sink.

Upvotes: 5

Views: 1801

Answers (2)

Flavio Del Grosso
Flavio Del Grosso

Reputation: 600

In hyper v1 things changed quite a lot, to copy data from a Hyper stream while simultaneously forwarding the stream I tried to implement a custom Body wrapper that clones the incoming data frames. I was really struggling with that, don't know if there's a better approach. Here my proposed solution:

use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::Bytes;
use futures::Stream;
use hyper::body::{Body, Incoming};
use tokio::sync::mpsc;

pub struct BodyReader<T> {
  inner: T,
  sender: mpsc::Sender<Bytes>,
}

impl<T: Body<Data = Bytes, Error = hyper::Error> + Unpin> Stream for BodyReader<T> {
  type Item = Result<Bytes, std::io::Error>;

  fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
    loop {
      return match futures::ready!(Pin::new(&mut self.inner).poll_frame(cx)) {
        Some(Ok(frame)) => match frame.into_data() {
          Ok(buf) => {
            let _ = self.sender.try_send(buf.clone());
            Poll::Ready(Some(Ok(buf)))
          }
          Err(_) => continue,
        },
        Some(Err(err)) => Poll::Ready(Some(Err(std::io::Error::other(err)))),
        None => Poll::Ready(None),
      };
    }
  }
}

impl BodyReader<Incoming> {
  pub fn new(body: Incoming) -> (Self, mpsc::Receiver<Bytes>) {
    let (sender, receiver) = mpsc::channel(10);
    let reader = BodyReader {
      inner: body,
      sender,
    };

    (reader, receiver)
  }
}

type ReaderAndReceiver = (BodyReader<Incoming>, mpsc::Receiver<Bytes>);

pub trait BodyReading {
  fn read(self) -> ReaderAndReceiver;
}

impl BodyReading for Incoming {
  fn read(self) -> ReaderAndReceiver {
    BodyReader::new(self)
  }
}

Upvotes: 0

molf
molf

Reputation: 75025

What I ended up doing was implement a new type of stream that does what I need. This appeared to be necessary because hyper::Body does not implement Sink nor does hyper::Chunk implement Clone (which is required for Sink.fanout()), so I cannot use any of the existing combinators.

First a struct that contains all details that we need and methods to append a new chunk, as well as notify that the buffer is completed.

struct BodyClone<T> {
    body: T,
    buffer: Option<Vec<u8>>,
    sender: Option<futures::sync::oneshot::Sender<Vec<u8>>>,
}

impl BodyClone<hyper::Body> {
    fn flush(&mut self) {
        if let (Some(buffer), Some(sender)) = (self.buffer.take(), self.sender.take()) {
            if sender.send(buffer).is_err() {}
        }
    }

    fn push(&mut self, chunk: &hyper::Chunk) {
        use hyper::body::Payload;

        let length = if let Some(buffer) = self.buffer.as_mut() {
            buffer.extend_from_slice(chunk);
            buffer.len() as u64
        } else {
            0
        };

        if let Some(content_length) = self.body.content_length() {
            if length >= content_length {
                self.flush();
            }
        }
    }
}

Then I implemented the Stream trait for this struct.

impl Stream for BodyClone<hyper::Body> {
    type Item = hyper::Chunk;
    type Error = hyper::Error;

    fn poll(&mut self) -> futures::Poll<Option<Self::Item>, Self::Error> {
        match self.body.poll() {
            Ok(Async::Ready(Some(chunk))) => {
                self.push(&chunk);
                Ok(Async::Ready(Some(chunk)))
            }
            Ok(Async::Ready(None)) => {
                self.flush();
                Ok(Async::Ready(None))
            }
            other => other,
        }
    }
}

Finally I could define an extension method on hyper::Body:

pub type BufferFuture = Box<Future<Item = Vec<u8>, Error = ()> + Send>;

trait CloneBody {
    fn clone_body(self) -> (hyper::Body, BufferFuture);
}

impl CloneBody for hyper::Body {
    fn clone_body(self) -> (hyper::Body, BufferFuture) {
        let (sender, receiver) = futures::sync::oneshot::channel();

        let cloning_stream = BodyClone {
            body: self,
            buffer: Some(Vec::new()),
            sender: Some(sender),
        };

        (
            hyper::Body::wrap_stream(cloning_stream),
            Box::new(receiver.map_err(|_| ())),
        )
    }
}

This can be used as follows:

let (body: hyper::Body, buffer: BufferFuture) = body.clone_body();

Upvotes: 5

Related Questions