Reputation: 4190
I would like to create a small Rust HTTP proxy using hyper which accepts requests, forwards them and dumps the request + body.
Based on this example, the proxy part works fine.
However, I can't simply copy & print the request body. My main problem is that the request body can't be simply copied into something like an Vec<u8>
. I cannot deconstruct
the request to read the body and then create it later since the deconstructed headers can't be added to a new request.
The following code shows my minimal HTTP proxy example:
extern crate futures;
extern crate hyper;
extern crate tokio_core;
use futures::{Future, Stream};
use hyper::{Body, Client, StatusCode};
use hyper::client::HttpConnector;
use hyper::header::{ContentLength, ContentType};
use hyper::server::{Http, Request, Response, Service};
use tokio_core::reactor::Core;
type HTTPClient = Client<HttpConnector, Body>;
struct Server {
client: HTTPClient,
}
impl Server {
pub fn new(client: HTTPClient) -> Server {
Server { client: client }
}
}
impl Service for Server {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, mut req: Request) -> Self::Future {
let req_uri_str = {
let uri = req.uri();
format!(
"http://localhost{}?{}",
uri.path(),
uri.query().unwrap_or_default()
)
};
req.set_uri(req_uri_str.parse().unwrap());
// Try to create a copy of the new request
/*
let (method, uri, version, headers, body) = req.deconstruct();
let mut req_copy: Request<hyper::Body> = Request::new(method, uri);
// Main problem: How can the request body be copied?
// >>> let body_bytes: Vec<u8> = ...
req_copy.set_body(body);
req_copy.set_version(version);
// Try to copy the headers
for header in headers.iter() {
req_copy.headers_mut().set(header.value().unwrap());
}
*/
// This works if the request is not deconstructed
let work = self.client
.request(req)
.and_then(|res| futures::future::ok(res))
.or_else(|err| {
let body = format!("{}\n", err);
futures::future::ok(
Response::new()
.with_status(StatusCode::BadRequest)
.with_header(ContentType::plaintext())
.with_header(ContentLength(body.len() as u64))
.with_body(body),
)
});
Box::new(work)
}
}
fn main() {
// Create HTTP client core + handles
let mut core = Core::new().unwrap();
let handle = core.handle();
let handle_clone = handle.clone();
// Create HTTP server
let server_addr = "127.0.0.1:9999".parse().unwrap();
let server = Http::new()
.serve_addr_handle(&server_addr, &handle, move || {
Ok(Server::new(Client::new(&handle_clone)))
})
.unwrap();
// Connect HTTP client with server
let handle_clone2 = handle.clone();
handle.spawn(
server
.for_each(move |conn| {
handle_clone2.spawn(conn.map(|_| ()).map_err(|err| println!("Error: {:?}", err)));
Ok(())
})
.map_err(|_| ()),
);
core.run(futures::future::empty::<(), ()>()).unwrap();
}
Running this works fine, if you have any HTTP service running on Port 80, connecting with a browser to port 9999 will forward any responses and requests perfectly.
However, if you re-enable the lines regarding the construction of a new, copied request, my approach fails since I don't understand how to copy the headers. (Furthermore, this doesn't really help me when it comes to copying the request body)
I'm aware that there are similar questions here, but none of them match my requirement to re-use the request body after looking at it (or don't have answers at all).
Upvotes: 3
Views: 4539
Reputation: 430270
the request body can't be simply copied into something like an
Vec<u8>
Sure it can. In the Rust standard library, it's worth memorizing the capabilities of the Iterator
trait. When dealing with futures, you should also memorize the capabilities of Future
and Stream
.
For example, hyper's Body
implements Stream
. This means you can use the Stream::concat2
method:
Concatenate all results of a stream into a single extendable destination, returning a future representing the end result.
This creates one large Chunk
which can be converted to a Vec
:
extern crate hyper; // 0.11.22
extern crate futures; // 0.1.18
use futures::{Future, Stream};
fn example(req: hyper::Request) {
req.body().concat2().map(|chunk| {
let body = chunk.to_vec();
println!("{:?}", body);
()
});
// Use this future somehow!
}
Likewise, a Vec<u8>
can be converted back into a Body
.
since the deconstructed headers can't be added to a new request.
req_copy.headers_mut().extend(headers.iter());
All together:
fn create_localhost_request(req: Request) -> (Request, Body) {
let (method, uri, version, headers, body) = req.deconstruct();
let req_uri_str = {
format!(
"http://localhost{}?{}",
uri.path(),
uri.query().unwrap_or_default()
)
};
let uri = req_uri_str.parse().unwrap();
let mut req_copy = Request::new(method, uri);
req_copy.set_version(version);
req_copy.headers_mut().extend(headers.iter());
(req_copy, body)
}
fn perform_proxy_request(
client: HttpClient,
req: Request,
) -> Box<Future<Item = Response, Error = hyper::Error>> {
Box::new(client.request(req).or_else(|err| {
let body = format!("{}\n", err);
Ok(Response::new()
.with_status(StatusCode::BadRequest)
.with_header(ContentType::plaintext())
.with_header(ContentLength(body.len() as u64))
.with_body(body))
}))
}
impl Service for Server {
type Request = Request;
type Response = Response;
type Error = hyper::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;
fn call(&self, req: Request) -> Self::Future {
let (mut req, body) = create_localhost_request(req);
let client = self.client.clone();
let work = body
.concat2()
.map(|chunk| chunk.to_vec())
// Do whatever we need with the body here, but be careful
// about doing any synchronous work.
.map(move |body| {
req.set_body(body);
req
})
.and_then(|req| perform_proxy_request(client, req));
Box::new(work)
}
}
Upvotes: 5