Kornel
Kornel

Reputation: 100110

How to send data through a futures Stream by writing through the io::Write trait?

I have a function that takes a &mut io::Write and I'd like to use it to send a streaming response from the actix-web server without having to buffer the whole response. The function is "pushing" the data, and I can't change the function (that's the whole premise of this question) to use async streams or other kind of polling.

Currently I'm forced to use &mut Vec (which implements io::Write) to buffer the whole result and then send the Vec as the response body. However, the response may be large, so I'd rather stream it without buffering.

Is there some kind of adapter that would implement io::Write, with writes blocking as necessary in response to backpressure, and be compatible with types that actix-web can use for responses (e.g. futures::Stream)?

fn generate(output: &mut io::Write) {
    // ...
}

fn request_handler() -> Result<HttpResponse> {
    thread::spawn(|| generate(/*???*/));
    Ok(HttpResponse::Ok().body(/*???*/))
}

std::sync::mpsc and futures::mpsc have either both ends async, or both ends blocking, so it's not obvious how to use them as an adapter between sync and async ends.

Upvotes: 2

Views: 1989

Answers (2)

Shepmaster
Shepmaster

Reputation: 430711

It is possible. The key piece is futures::sink::Wait:

A sink combinator which converts an asynchronous sink to a blocking sink.

Created by the Sink::wait method, this function transforms any sink into a blocking version. This is implemented by blocking the current thread when a sink is otherwise unable to make progress.

All that is needed is to wrap this type in a struct that implements io::Write:

use futures::{
    sink::{Sink, Wait},
    sync::mpsc,
}; // 0.1.26
use std::{io, thread};

fn generate(_output: &mut io::Write) {
    // ...
}

struct MyWrite<T>(Wait<mpsc::Sender<T>>);

impl<T> io::Write for MyWrite<T>
where
    T: for<'a> From<&'a [u8]> + Send + Sync + 'static,
{
    fn write(&mut self, d: &[u8]) -> io::Result<usize> {
        let len = d.len();
        self.0
            .send(d.into())
            .map(|()| len)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn flush(&mut self) -> io::Result<()> {
        self.0
            .flush()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }
}

fn foo() -> impl futures::Stream<Item = Vec<u8>, Error = ()> {
    let (tx, rx) = mpsc::channel(5);

    let mut w = MyWrite(tx.wait());

    thread::spawn(move || generate(&mut w));

    rx
}

Upvotes: 5

Nikolay Kim
Nikolay Kim

Reputation: 154

It is not possible. Actix-web manages its own write buffer and socket.

Upvotes: -1

Related Questions