Reputation: 2489
I am using tokio to implement a server which communicates with messages serialized with serde (bincode). Without asynchronous and futures I would do
extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;
type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;
// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
let message: Item = deserialize_from(reader, Infinite)?;
Ok(message)
}
fn main() {
let ser = serialize("Test", Infinite).unwrap();
let buf = Cursor::new(ser);
let mut reader = std::io::BufReader::new(buf);
println!("{:?}", decode(&mut reader))
}
But what I need is a decode
function which can work with an asyncronous socket as
// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
// Does not work:
let message: Item = deserialize_from(reader, Infinite)?;
Ok(message)
}
The only idea I have is to manually write the length into the buffer during encoding and then use read_exact:
// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
let size = serialized_size(item);
let mut buf = serialize(&size, Infinite).unwrap();
let ser = serialize(item, Infinite).unwrap();
buf.extend(ser);
Ok(buf)
}
// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
where R: AsyncRead + 'static {
let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
let size = deserialize::<u64>(&mut &buf[..]).unwrap();
Ok((reader, size as usize))
}).and_then(|(reader, size)| {
read_exact(reader, vec![0u8; size])
}).and_then(|(reader, buf)| {
let item = deserialize(&mut &buf[..]).unwrap();
Ok(item)
});
Box::new(read)
}
fn main() {
let ser = encode_async(&String::from("Test")).unwrap();
let buf = Cursor::new(ser);
let mut reader = std::io::BufReader::new(buf);
let dec = decode_async(reader).wait();
println!("{:?}", dec)
}
Is there a better way to implement the decoding?
Upvotes: 4
Views: 2515
Reputation: 1455
Stefan's answer is correct, however you might be interested in looking at the tokio-serde-* family of crates which do this for you, specifically tokio-serde-bincode. From the readme:
Utilities needed to easily implement a Tokio Bincode transport using serde for serialization and deserialization of frame values. Based on tokio-serde.
The crate has several examples of how to use it.
Upvotes: 0
Reputation: 5530
deserialize_from
can't handle IO errors, especially not of the kind WouldBlock
which is returned by async (non-blocking) Read
ers when they are waiting for more data. That is limited by the interface: deserialize_from
doesn't return a Future
or a partial state, it returns the full decoded Result
and wouldn't know how to combine the Read
er with an event loop to handle WouldBlock
without busy looping.
Theoretically, it is possible to implement an async_deserialize_from
, but not by using the interfaces provided by serde
unless you read the full data to decode in advance, which would defeat the purpose.
You need to read the full data using tokio_io::io::read_to_end
or tokio_io::io::read_exact
(what you're currently using), if you know the size of the encoded data in an "endless" stream (or in a stream followed by other data).
Upvotes: 4