Reputation: 100110
I have a function that takes a &mut io::Write
and I'd like to use it to send a streaming response from the actix-web server without having to buffer the whole response. The function is "pushing" the data, and I can't change the function (that's the whole premise of this question) to use async streams or other kind of polling.
Currently I'm forced to use &mut Vec
(which implements io::Write
) to buffer the whole result and then send the Vec
as the response body. However, the response may be large, so I'd rather stream it without buffering.
Is there some kind of adapter that would implement io::Write
, with writes blocking as necessary in response to backpressure, and be compatible with types that actix-web can use for responses (e.g. futures::Stream
)?
fn generate(output: &mut io::Write) {
// ...
}
fn request_handler() -> Result<HttpResponse> {
thread::spawn(|| generate(/*???*/));
Ok(HttpResponse::Ok().body(/*???*/))
}
std::sync::mpsc
and futures::mpsc
have either both ends async, or both ends blocking, so it's not obvious how to use them as an adapter between sync and async ends.
Upvotes: 2
Views: 1989
Reputation: 430711
It is possible. The key piece is futures::sink::Wait
:
A sink combinator which converts an asynchronous sink to a blocking sink.
Created by the
Sink::wait
method, this function transforms any sink into a blocking version. This is implemented by blocking the current thread when a sink is otherwise unable to make progress.
All that is needed is to wrap this type in a struct that implements io::Write
:
use futures::{
sink::{Sink, Wait},
sync::mpsc,
}; // 0.1.26
use std::{io, thread};
fn generate(_output: &mut io::Write) {
// ...
}
struct MyWrite<T>(Wait<mpsc::Sender<T>>);
impl<T> io::Write for MyWrite<T>
where
T: for<'a> From<&'a [u8]> + Send + Sync + 'static,
{
fn write(&mut self, d: &[u8]) -> io::Result<usize> {
let len = d.len();
self.0
.send(d.into())
.map(|()| len)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn flush(&mut self) -> io::Result<()> {
self.0
.flush()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}
fn foo() -> impl futures::Stream<Item = Vec<u8>, Error = ()> {
let (tx, rx) = mpsc::channel(5);
let mut w = MyWrite(tx.wait());
thread::spawn(move || generate(&mut w));
rx
}
Upvotes: 5
Reputation: 154
It is not possible. Actix-web manages its own write buffer and socket.
Upvotes: -1