james pearson
james pearson

Reputation: 221

How to send a stream of websocket messages using the send_all method of tokio-tungstenite

I am using the excellent rust websocket library tokio-tungstenite, which is just the tokio version of tungstenite.

I can easily connect to a websocket stream using the connect_async method and in the websocket stream I get back I split it as follows:

connect_async(url).and_then(move |(ws_stream, _)| {

        let (sink, stream) = ws_stream.split();

I can easily send a single message down the sink using the send message like this:

sink.send(tungstenite::Message::Text("my message".to_string()))
.map_err(|e| ())
.wait();

That is just a contrived example. My problem is I want to send a stream of messages down the sink which I've learned is best accomplished using the send_all method. My attempt is:

let my_messsages : Vec<tungstenite::Message> = vec![
tungstenite::Message::Text("message_1".to_string()), 
tungstenite::Message::Text("message_2".to_string()) ];

let send_stream: tokio_tungstenite::WebSocketStream
<tokio_tungstenite::tungstenite::Message> 
= futures::stream::iter_ok(my_messages);

sink.send_all(stream).map_err(|e| ()).wait();

But it doesn't work, because futures::stream::iter_ok isn't the correct way to construct the stream I need, which is where I'm stuck and cannot see how to proceed from the documentation.

Edit: Originally I did not specify a type annotation for the send_stream and it gave me this error:

type annotations needed for `futures::stream::iter_ok::IterOk
<std::vec::IntoIter<tungstenite::protocol::message::Message>, E>`

cannot infer type for `E`

The current error is also to do with type annotations for the send stream.

Upvotes: 2

Views: 5283

Answers (1)

S&#233;bastien Renauld
S&#233;bastien Renauld

Reputation: 19662

Your problem lies squarely on this line:

let send_stream:tokio_tungstenite::WebSocketStream<tokio_tungstenite::tungstenite::Message> 
    = futures::stream::iter_ok(my_messages);

The definition of futures::stream::iter_ok is as follows:

pub fn iter_ok<I, E>(i: I) -> IterOk<<I as IntoIterator>::IntoIter, E> 
where
    I: IntoIterator, 

Notice that this new stream is an IterOk, not a tungstenite::WebsocketStream. By forcing the type (with your annotations), you are forcing the compiler to look for an implementation of Into<WebsocketStream> for IterOk, which there is none.

As a result, this fails.

This type hint was completely unnecessary as send_all is defined for impl Sink<SinkItem = _, SinkError = _>.

Without too much surprise, changing the line to:

let send_stream = futures::stream::iter_ok(my_messages)
  .map_err(|_:()| tungstenite::error::Error::Utf8)

should work flawlessly, as your sink expects tungstenite::Message and this new stream has that as an element (as it was built from a Vec<tungstenite::Message>). The map_err is there purely to coerce types and will never be called in practice.

Upvotes: 1

Related Questions