Vikas Shetty
Vikas Shetty

Reputation: 37

How can I implement a pull-based system using Tokio?

I want to implement a pull-based system between a server and a client where the server will only push data when the client asks for it.

I was playing with Tokio and was able to create a push-based system where I was able to push a string at an interval of 1ms.

let done = listener
    .incoming()
    .for_each(move |socket| {
        let server_queue = _cqueue.clone();
        let (reader, mut writer) = socket.split();
        let sender = Interval::new_interval(std::time::Duration::from_millis(1))
            .for_each(move |_| {
                writer
                    .poll_write(server_queue.pull().borrow())
                    .map_err(|_| {
                        tokio::timer::Error::shutdown();
                    })
                    .unwrap();
                return Ok(());
            })
            .map_err(|e| println!("{}", e));
        ;
        tokio::spawn(sender);
        return Ok(());
    })
    .map_err(|e| println!("Future_error {}", e));

Is there a way to send only when the client asks for it without having to use a reader?

Upvotes: 0

Views: 564

Answers (1)

Sébastien Renauld
Sébastien Renauld

Reputation: 19672

Let's think back for a moment on the kind of events that could lead to this "sending of data". You can think of multiple ways:

  • The client connects to the server. By contract, this is "asking for data". You've implemented this case
  • The client sends an in-band message on the socket/pipe connecting the client and server. For that, you need to take the AsyncRead part of your socket, the AsyncWrite part that you've already used and build a duplex channel so you can read and talk at the same time
  • The client sends an out-of-band message, typically on another proto-host-port triplet and using a different protocol. Your current server recognizes it, and sends the client that data. To do this, you need a reader for the other triplet, and you need a messaging structure in place to relay this to the one place having access to the AsyncWrite part of your socket

The short answer is no, you cannot really act on an event that you're not listening for.


@Shepmaster I was just wondering if there was an existing library that can be used to handle this "neatly"

There is, and then there isn't.

Most libraries are centered around a specific problem. In your case, you've opted to work at the lowest possible level by having a TCP socket (implementing AsyncRead + AsyncWrite).

To do anything, you're going to need to decide on:

  1. A transport format
  2. A protocol

I tend to wrap code into this when I need a quick and dirty implementation of a duplex stream:

use futures::sync::mpsc::{UnboundedSender, unbounded};
use std::sync::{Arc};
use futures::{Sink, Stream, Future, future, stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::codec::{Framed, Encoder, Decoder};
use std::io;
use std::fmt::Debug;
use futures_locks::{RwLock as FutLock};

enum Message<T:Send+Debug+'static> {
    Content(T),
    Done
}

impl<T: Send + Debug + 'static> From<T> for Message<T> {
    fn from(message:T) -> Message<T> {
        Message::Content(message)
    }
}

struct DuplexStream<T:Send+Debug+'static> {
    writer: Arc<FutLock<UnboundedSender<Message<T>>>>,
    handlers: Arc<FutLock<Option<Box<dyn Stream<Item = Message<T>, Error = ()> + Send>>>>
}

impl<T:Send+Debug+'static> DuplexStream<T> {

    pub fn from<R,U>(framed_socket: Framed<R, U>) -> Arc<DuplexStream<T>>
        where U: Send + Encoder<Item = T> + Decoder<Item = T> + 'static, R: Send + AsyncRead + AsyncWrite + 'static {

        let (tx, rx) = framed_socket.split();

        // Assemble the combined upstream stream
        let (upstream_tx, upstream_rx) = unbounded();
        let upstream = upstream_rx.take_while(|item| match item {
            Message::Done => future::ok(false),
            _ => future::ok(true)
        }).fold(tx, |o, m| {
            o.send(match m {
                Message::Content(i) => i,
                _ => unreachable!()
            }).map_err(|_| {
                ()
            })
        }).map(|e| {
            Message::Done
        }).into_stream();

        // Assemble the downstream stream
        let downstream = rx.map_err(|_| ()).map(|r| {
            Message::Content(r)
        }).chain(stream::once(Ok(Message::Done)));

        Arc::new(DuplexStream {
            writer: Arc::new(FutLock::new(upstream_tx)),
            handlers: Arc::new(FutLock::new(Some(Box::new(upstream.select(downstream).take_while(|m| match m {
                Message::Content(_) => {
                    future::ok(true)
                },
                Message::Done => {
                    future::ok(false)
                }
            })))))
        })
    }

    pub fn start(self: Arc<Self>) -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
        Box::new(self.handlers
            .write()
            .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))
            .map(|mut handler| -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
                match handler.take() {
                    Some(e) => Box::new(e.map(|r| match r {
                        Message::Content(i) => i,
                        _ => unreachable!()
                    }).map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))),
                    None => Box::new(stream::once(Err(io::Error::new(io::ErrorKind::AddrInUse, "Handler already taken"))))
                }
            }).into_stream().flatten()
        )
    }

    pub fn close(self: Arc<Self>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        self.inner_send(Message::Done)
    }
    pub fn send(self: Arc<Self>, message: T) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        self.inner_send(message.into())
    }
    pub fn inner_send(self: Arc<Self>, message: Message<T>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
        Box::new(self.writer.write()
            .map_err(|_| io::Error::new(io::ErrorKind::NotFound, "The mutex has disappeared")).and_then(|guard| {
                future::result(guard.unbounded_send(message).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "The sink has gone away")))
        }))
    }
}

This struct has a multitude of advantages, but a few drawbacks. The main advantage is that you can deal with the read and write part on the same object the same way you would in another language. The object itself implements Clone (since it's an Arc), every method is usable everywhere (particularly useful for old futures code) and as long as you keep a copy of it somewhere and don't call close() it'll keep running (as long as the underlying AsyncRead + AsyncWrite implementation is still there).

This does not absolve you from points 1 and 2, but you can (and should) leverage tokio::codec::Framed for point 1, and implement point 2 as business logic.

An example (it's actually a test ;-) ) of the usage:

#[test]
fn it_writes() {
    let stream = DuplexStream::from(make_w());
    let stream_write = Arc::clone(&stream);
    let stream_read=  Arc::clone(&stream);
    let dup = Arc::clone(&stream);
    tokio::run(lazy(move || {
        let stream_write = Arc::clone(&stream_write);
        stream_read.start().and_then(move |i| {
            let stream_write = Arc::clone(&stream_write);
            stream_write.send("foo".to_string()).map(|_| i)
        }).collect().map(|r| {
            assert_eq!(r, vec!["foo".to_string(), "bar".to_string(), "bazfoo".to_string(), "foo".to_string()])
        }).map_err(|_| {
            assert_eq!(true, false);
        })
    }));
}

Upvotes: 1

Related Questions