Reputation: 37
I want to implement a pull-based system between a server and a client where the server will only push data when the client asks for it.
I was playing with Tokio and was able to create a push-based system where I was able to push a string at an interval of 1ms.
let done = listener
.incoming()
.for_each(move |socket| {
let server_queue = _cqueue.clone();
let (reader, mut writer) = socket.split();
let sender = Interval::new_interval(std::time::Duration::from_millis(1))
.for_each(move |_| {
writer
.poll_write(server_queue.pull().borrow())
.map_err(|_| {
tokio::timer::Error::shutdown();
})
.unwrap();
return Ok(());
})
.map_err(|e| println!("{}", e));
;
tokio::spawn(sender);
return Ok(());
})
.map_err(|e| println!("Future_error {}", e));
Is there a way to send only when the client asks for it without having to use a reader?
Upvotes: 0
Views: 564
Reputation: 19672
Let's think back for a moment on the kind of events that could lead to this "sending of data". You can think of multiple ways:
AsyncRead
part of your socket
, the AsyncWrite
part that you've already used and build a duplex channel so you can read and talk at the same timeAsyncWrite
part of your socketThe short answer is no, you cannot really act on an event that you're not listening for.
@Shepmaster I was just wondering if there was an existing library that can be used to handle this "neatly"
There is, and then there isn't.
Most libraries are centered around a specific problem. In your case, you've opted to work at the lowest possible level by having a TCP socket (implementing AsyncRead + AsyncWrite
).
To do anything, you're going to need to decide on:
I tend to wrap code into this when I need a quick and dirty implementation of a duplex stream:
use futures::sync::mpsc::{UnboundedSender, unbounded};
use std::sync::{Arc};
use futures::{Sink, Stream, Future, future, stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::codec::{Framed, Encoder, Decoder};
use std::io;
use std::fmt::Debug;
use futures_locks::{RwLock as FutLock};
enum Message<T:Send+Debug+'static> {
Content(T),
Done
}
impl<T: Send + Debug + 'static> From<T> for Message<T> {
fn from(message:T) -> Message<T> {
Message::Content(message)
}
}
struct DuplexStream<T:Send+Debug+'static> {
writer: Arc<FutLock<UnboundedSender<Message<T>>>>,
handlers: Arc<FutLock<Option<Box<dyn Stream<Item = Message<T>, Error = ()> + Send>>>>
}
impl<T:Send+Debug+'static> DuplexStream<T> {
pub fn from<R,U>(framed_socket: Framed<R, U>) -> Arc<DuplexStream<T>>
where U: Send + Encoder<Item = T> + Decoder<Item = T> + 'static, R: Send + AsyncRead + AsyncWrite + 'static {
let (tx, rx) = framed_socket.split();
// Assemble the combined upstream stream
let (upstream_tx, upstream_rx) = unbounded();
let upstream = upstream_rx.take_while(|item| match item {
Message::Done => future::ok(false),
_ => future::ok(true)
}).fold(tx, |o, m| {
o.send(match m {
Message::Content(i) => i,
_ => unreachable!()
}).map_err(|_| {
()
})
}).map(|e| {
Message::Done
}).into_stream();
// Assemble the downstream stream
let downstream = rx.map_err(|_| ()).map(|r| {
Message::Content(r)
}).chain(stream::once(Ok(Message::Done)));
Arc::new(DuplexStream {
writer: Arc::new(FutLock::new(upstream_tx)),
handlers: Arc::new(FutLock::new(Some(Box::new(upstream.select(downstream).take_while(|m| match m {
Message::Content(_) => {
future::ok(true)
},
Message::Done => {
future::ok(false)
}
})))))
})
}
pub fn start(self: Arc<Self>) -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
Box::new(self.handlers
.write()
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))
.map(|mut handler| -> Box<dyn Stream<Item = T, Error = io::Error> + Send> {
match handler.take() {
Some(e) => Box::new(e.map(|r| match r {
Message::Content(i) => i,
_ => unreachable!()
}).map_err(|_| io::Error::new(io::ErrorKind::NotFound, "Stream closed"))),
None => Box::new(stream::once(Err(io::Error::new(io::ErrorKind::AddrInUse, "Handler already taken"))))
}
}).into_stream().flatten()
)
}
pub fn close(self: Arc<Self>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
self.inner_send(Message::Done)
}
pub fn send(self: Arc<Self>, message: T) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
self.inner_send(message.into())
}
pub fn inner_send(self: Arc<Self>, message: Message<T>) -> Box<dyn Future<Item = (), Error = io::Error> + Send> {
Box::new(self.writer.write()
.map_err(|_| io::Error::new(io::ErrorKind::NotFound, "The mutex has disappeared")).and_then(|guard| {
future::result(guard.unbounded_send(message).map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "The sink has gone away")))
}))
}
}
This struct has a multitude of advantages, but a few drawbacks. The main advantage is that you can deal with the read and write part on the same object the same way you would in another language. The object itself implements Clone
(since it's an Arc
), every method is usable everywhere (particularly useful for old futures
code) and as long as you keep a copy of it somewhere and don't call close()
it'll keep running (as long as the underlying AsyncRead + AsyncWrite
implementation is still there).
This does not absolve you from points 1 and 2, but you can (and should) leverage tokio::codec::Framed
for point 1, and implement point 2 as business logic.
An example (it's actually a test ;-) ) of the usage:
#[test]
fn it_writes() {
let stream = DuplexStream::from(make_w());
let stream_write = Arc::clone(&stream);
let stream_read= Arc::clone(&stream);
let dup = Arc::clone(&stream);
tokio::run(lazy(move || {
let stream_write = Arc::clone(&stream_write);
stream_read.start().and_then(move |i| {
let stream_write = Arc::clone(&stream_write);
stream_write.send("foo".to_string()).map(|_| i)
}).collect().map(|r| {
assert_eq!(r, vec!["foo".to_string(), "bar".to_string(), "bazfoo".to_string(), "foo".to_string()])
}).map_err(|_| {
assert_eq!(true, false);
})
}));
}
Upvotes: 1