osa1
osa1

Reputation: 7078

Split a futures connection into a sink and a stream and use them in two different tasks

I'm experimenting with the futures API using the websocket library. I have this code:

use futures::future::Future;
use futures::future;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::sync::mpsc::channel;
use futures::sync::mpsc::{Sender, Receiver};
use tokio_core::reactor::Core;

use websocket::{ClientBuilder, OwnedMessage};

pub fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let handle_clone = handle.clone();

    let (send, recv): (Sender<String>, Receiver<String>) = channel(100);

    let f = ClientBuilder::new("wss://...")
        .unwrap()
        .async_connect(None, &handle_clone)
        .map_err(|e| println!("error: {:?}", e))

        .map(|(duplex, _)| duplex.split())
        .and_then(move |(sink, stream)| {

            // this task consumes the channel, writes messages to the websocket
            handle_clone.spawn(future::loop_fn(recv, |recv: Receiver<String>| {
                sink.send(OwnedMessage::Close(None))
                    .and_then(|_| future::ok(future::Loop::Break(())))
                    .map_err(|_| ())
            }));

            // the main tasks listens the socket
            future::loop_fn(stream, |stream| {
                stream
                    .into_future()
                    .and_then(|_| future::ok(future::Loop::Break(())))
                    .map_err(|_| ())
            })
        });

    loop {
        core.turn(None)
    }
}

After connecting to the server, I want to run "listener" and "sender" tasks without one blocking the other one. The problem is I can't use sink in the new task, it fails with:

error[E0507]: cannot move out of captured outer variable in an `FnMut` closure
  --> src/slack_conn.rs:29:17
   |
25 |         .and_then(move |(sink, stream)| {
   |                          ---- captured outer variable
...
29 |                 sink.send(OwnedMessage::Close(None))
   |                 ^^^^ cannot move out of captured outer variable in an `FnMut` closure

I could directly use duplex to send and receive, but that leads to worse errors.

Any ideas on how to make this work? Indeed, I'd be happy with any futures code that allows me to non-blockingly connect to a server and spawn two async tasks:

It's fine if I have to write it in a different style.

Upvotes: 1

Views: 2902

Answers (1)

Shepmaster
Shepmaster

Reputation: 430368

SplitSink implements Sink which defines send to take ownership:

fn send(self, item: Self::SinkItem) -> Send<Self>
where
    Self: Sized,

On the other hand, loop_fn requires that the closure be able to be called multiple times. These two things are fundamentally incompatible — how can you call something multiple times which requires consuming a value?


Here's a completely untested piece of code that compiles — I don't have rogue WebSocket servers lying about.

#[macro_use]
extern crate quick_error;

extern crate futures;
extern crate tokio_core;
extern crate websocket;

use futures::{Future, Stream, Sink};
use futures::sync::mpsc::channel;
use tokio_core::reactor::Core;

use websocket::ClientBuilder;

pub fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let (send, recv) = channel(100);

    let f = ClientBuilder::new("wss://...")
        .unwrap()
        .async_connect(None, &handle)
        .from_err::<Error>()
        .map(|(duplex, _)| duplex.split())
        .and_then(|(sink, stream)| {
            let reader = stream
                .for_each(|i| {
                    println!("Read a {:?}", i);
                    Ok(())
                })
                .from_err();

            let writer = sink
               .sink_from_err()
               .send_all(recv.map_err(Error::Receiver))
               .map(|_| ());

            reader.join(writer)
        });

    drop(send); // Close the sending channel manually

    core.run(f).expect("Unable to run");
}

quick_error! {
    #[derive(Debug)]
    pub enum Error {
        WebSocket(err: websocket::WebSocketError) {
            from()
            description("websocket error")
            display("WebSocket error: {}", err)
            cause(err)
        }
        Receiver(err: ()) {
            description("receiver error")
            display("Receiver error")
        }
    }
}

The points that stuck out during implementation were:

  1. everything has to become a Future eventually
  2. it's way easier to define an error type and convert to it
  3. Knowing if the Item and Error associated types were "right" was tricky. I ended up doing a lot of "type assertions" ({ let x: &Future<Item = (), Error = ()> = &reader; }).

Upvotes: 2

Related Questions