osa1
osa1

Reputation: 7078

Rust futures -- join two futures together

I have this code

use futures::Map;
use futures::sink::SendAll;
use futures::sink::SinkFromErr;
use futures::stream::Forward;
use futures::sync::mpsc::Receiver;
use futures::sync::mpsc::Sender;
use futures::{Future, Stream, Sink};
use std::boxed::FnBox;
use tokio_core::reactor::Core;
use websocket::async::futures::stream::SplitSink;
use websocket::async::futures::stream::SplitStream;
use websocket::ClientBuilder;
use websocket;

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
    Box::new(move || {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        let f = ClientBuilder::new("wss://...")
            .unwrap()
            .async_connect(None, &handle)
            .from_err::<Error>()
            .map(|(duplex, _)| duplex.split())
            .and_then(|(sink, stream): (SplitSink<_>, SplitStream<_>)| {

                let writer: Map<SendAll<SinkFromErr<SplitSink<_>, _>, _>, _> =
                    sink
                    .sink_from_err()
                    .send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
                    .map(|_| ());

                // Trying to uncomment these lines:
                // let reader =
                //     stream
                //     .forward(send);
                //
                // reader.join(writer)

                // Comment this out:
                writer
            });

        core.run(f).expect("Unable to run");
    })
}

quick_error! {
    #[derive(Debug)]
    pub enum Error {
        WebSocket(err: websocket::WebSocketError) {
            from()
            description("websocket error")
            display("WebSocket error: {}", err)
            cause(err)
        }
        Receiver(err: ()) {
            description("receiver error")
            display("Receiver error")
        }
    }
}

I added some type annotation for clarity. This version compiles, but I want another future, that reads from the stream (stream) and writes to send. I can't make it compile and I type errors are completely incomprehensible. So my questions are:

  1. How do I make the forward() call compile? (try to enable the commented-out code)
  2. How did you come up with the code that makes this compile? In my experience, futures-heavy code is impossible to write and understand because types are too complex and type errors are incomprehensible.

Upvotes: 0

Views: 2175

Answers (1)

osa1
osa1

Reputation: 7078

I needed two map_err calls, one for mapping websocket errors from stream, one for mapping sender errors from send:

pub fn main(recv: Receiver<String>, send: Sender<websocket::OwnedMessage>) -> Box<FnBox() -> () + Send> {
    Box::new(move || {
        let mut core = Core::new().unwrap();
        let handle = core.handle();

        let f = ClientBuilder::new("wss://...")
            .unwrap()
            .async_connect(None, &handle)
            .from_err::<Error>()
            .map(|(duplex, _)| duplex.split())
            .and_then(|(sink, stream): (SplitSink<_>, SplitStream<_>)| {

                let writer: Map<SendAll<SinkFromErr<SplitSink<_>, _>, _>, _> =
                    sink
                    .sink_from_err()
                    .send_all(recv.map(websocket::OwnedMessage::Text).map_err(Error::Receiver))
                    .map(|_| ());

                let reader =
                    stream
                    .map_err(Error::WebSocket)
                    .forward(send.sink_map_err(Error::Sender));

                reader.join(writer)
            });

        core.run(f).expect("Unable to run");
    })
}

Upvotes: 4

Related Questions