Manuel Schmidt
Manuel Schmidt

Reputation: 2489

Deserialize from tokio socket

I am using tokio to implement a server which communicates with messages serialized with serde (bincode). Without asynchronous and futures I would do

extern crate tokio_io;
extern crate bincode;
extern crate serde;
extern crate bytes;
extern crate futures;
#[macro_use] extern crate serde_derive;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio_io::io::{read_exact, write_all};
use bincode::{serialize, deserialize, deserialize_from, Infinite, serialized_size};
use std::io::Read;
use std::io::Cursor;
use futures::future::Future;

type Item = String; // Dummy, this is a complex struct with derived Serizalize
type Error = bincode::Error;

// This works
fn decode<R>(reader: &mut R) -> Result<Item, Error> where R: Read {
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

fn main() {

    let ser = serialize("Test", Infinite).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);

    println!("{:?}", decode(&mut reader))
}

But what I need is a decode function which can work with an asyncronous socket as

// I need this since I get the reader from a (tokio) socket as
// let socket = TcpListener::bind(&addr, &handle).unwrap();
// let (reader, writer) = socket.split();
fn decode_async<R>(reader: R) -> Result<Item, Error> where R: AsyncRead {
    // Does not work:   
    let message: Item = deserialize_from(reader, Infinite)?;
    Ok(message)
}

The only idea I have is to manually write the length into the buffer during encoding and then use read_exact:

// Encode with size
fn encode_async(item: &Item) -> Result<Vec<u8>, Error>{
    let size = serialized_size(item);
    let mut buf = serialize(&size, Infinite).unwrap();
    let ser = serialize(item, Infinite).unwrap();
    buf.extend(ser);
    Ok(buf)
}

// Decode with size
fn decode_async<R>(reader: R) -> Box<Future<Item = Item, Error = std::io::Error>>
    where R: AsyncRead + 'static {

    let read = read_exact(reader, vec![0u8; 8]).and_then(|(reader, buf)| {
        let size = deserialize::<u64>(&mut &buf[..]).unwrap();
        Ok((reader, size as usize))
    }).and_then(|(reader, size)| {
        read_exact(reader, vec![0u8; size])
    }).and_then(|(reader, buf)| {
        let item = deserialize(&mut &buf[..]).unwrap();
        Ok(item)
    });

    Box::new(read)
}

fn main() {

    let ser = encode_async(&String::from("Test")).unwrap();
    let buf = Cursor::new(ser);

    let mut reader = std::io::BufReader::new(buf);
    let dec = decode_async(reader).wait();

    println!("{:?}", dec)
}

Is there a better way to implement the decoding?

Upvotes: 4

Views: 2515

Answers (2)

effect
effect

Reputation: 1455

Stefan's answer is correct, however you might be interested in looking at the tokio-serde-* family of crates which do this for you, specifically tokio-serde-bincode. From the readme:

Utilities needed to easily implement a Tokio Bincode transport using serde for serialization and deserialization of frame values. Based on tokio-serde.

The crate has several examples of how to use it.

Upvotes: 0

Stefan
Stefan

Reputation: 5530

deserialize_from can't handle IO errors, especially not of the kind WouldBlock which is returned by async (non-blocking) Readers when they are waiting for more data. That is limited by the interface: deserialize_from doesn't return a Future or a partial state, it returns the full decoded Result and wouldn't know how to combine the Reader with an event loop to handle WouldBlock without busy looping.

Theoretically, it is possible to implement an async_deserialize_from, but not by using the interfaces provided by serde unless you read the full data to decode in advance, which would defeat the purpose.

You need to read the full data using tokio_io::io::read_to_end or tokio_io::io::read_exact (what you're currently using), if you know the size of the encoded data in an "endless" stream (or in a stream followed by other data).

Upvotes: 4

Related Questions