Reputation: 953
I have a tokio TcpStream
. I want to pass some type T
over this stream. This type T
implement Serialize
and Deserialize
. How can I obtain a Sink<T>
and a Stream<T>
?
I found the crates tokio_util and tokio_serde, but I can't figure out how to use them to do what I want.
Upvotes: 1
Views: 3125
Reputation: 1951
I don't know your code structure or the codec you're planning on using, but I've figured out how to glue everything together into a workable example.
Your Sink<T>
and Stream<Item=T>
are going to be provided by the Framed
type in tokio-serde
. This layer deals with passing your messages through serde
. This type takes four generic parameters: Transport
, Item
(the stream item), SinkItem
, and Codec
. Codec
is a wrapper for the specific serializer and deserializer you want to use. You can view the provided options here. Item
and SinkItem
are just going to be your message type which must implement Serialize
and Deserialize
. Transport
needs to be a Sink<SinkItem>
and/or Stream<Item=Item>
itself in order for the frame to implement any useful traits. This is where tokio-util
comes in. It provides various Framed*
types which allow you to convert things implementing AsyncRead
/AsyncWrite
into streams and sinks respectively. In order to construct these frames, you need to specify a codec which delimits frames from the wire. For simplicity in my example I just used the LengthDelimitedCodec
, but there are other options provided as well.
Without further adieu, here's an example of how you can take a tokio::net::TcpStream
and split it into an Sink<T>
and Stream<Item=T>
. Note that T
is a result on the stream side because the serde layer can fail if the message is malformed.
use futures::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::net::{
tcp::{OwnedReadHalf, OwnedWriteHalf},
TcpListener,
TcpStream,
};
use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
#[derive(Serialize, Deserialize, Debug)]
struct MyMessage {
field: String,
}
type WrappedStream = FramedRead<OwnedReadHalf, LengthDelimitedCodec>;
type WrappedSink = FramedWrite<OwnedWriteHalf, LengthDelimitedCodec>;
// We use the unit type in place of the message types since we're
// only dealing with one half of the IO
type SerStream = Framed<WrappedStream, MyMessage, (), Json<MyMessage, ()>>;
type DeSink = Framed<WrappedSink, (), MyMessage, Json<(), MyMessage>>;
fn wrap_stream(stream: TcpStream) -> (SerStream, DeSink) {
let (read, write) = stream.into_split();
let stream = WrappedStream::new(read, LengthDelimitedCodec::new());
let sink = WrappedSink::new(write, LengthDelimitedCodec::new());
(
SerStream::new(stream, Json::default()),
DeSink::new(sink, Json::default()),
)
}
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("0.0.0.0:8080")
.await
.expect("Failed to bind server to addr");
tokio::task::spawn(async move {
let (stream, _) = listener
.accept()
.await
.expect("Failed to accept incoming connection");
let (mut stream, mut sink) = wrap_stream(stream);
println!(
"Server received: {:?}",
stream
.next()
.await
.expect("No data in stream")
.expect("Failed to parse ping")
);
sink.send(MyMessage {
field: "pong".to_owned(),
})
.await
.expect("Failed to send pong");
});
let stream = TcpStream::connect("127.0.0.1:8080")
.await
.expect("Failed to connect to server");
let (mut stream, mut sink) = wrap_stream(stream);
sink.send(MyMessage {
field: "ping".to_owned(),
})
.await
.expect("Failed to send ping to server");
println!(
"Client received: {:?}",
stream
.next()
.await
.expect("No data in stream")
.expect("Failed to parse pong")
);
}
Running this example yields:
Server received: MyMessage { field: "ping" }
Client received: MyMessage { field: "pong" }
Note that it's not required that you split the stream. You could instead construct a tokio_util::codec::Framed
out of the TcpStream
, and construct a tokio_serde::Framed
with a tokio_serde::formats::SymmetricalJson<MyMessage>
, and then that Framed
would implement Sink
and Stream
accordingly. Also a lot of the functionality in this example is feature-gated, so be sure to enable the appropriate features according to the docs.
Upvotes: 8