Reputation: 221
I am using the excellent rust websocket library tokio-tungstenite, which is just the tokio version of tungstenite.
I can easily connect to a websocket stream using the connect_async method and in the websocket stream I get back I split it as follows:
connect_async(url).and_then(move |(ws_stream, _)| {
let (sink, stream) = ws_stream.split();
I can easily send a single message down the sink using the send message like this:
sink.send(tungstenite::Message::Text("my message".to_string()))
.map_err(|e| ())
.wait();
That is just a contrived example. My problem is I want to send a stream of messages down the sink which I've learned is best accomplished using the send_all method. My attempt is:
let my_messsages : Vec<tungstenite::Message> = vec![
tungstenite::Message::Text("message_1".to_string()),
tungstenite::Message::Text("message_2".to_string()) ];
let send_stream: tokio_tungstenite::WebSocketStream
<tokio_tungstenite::tungstenite::Message>
= futures::stream::iter_ok(my_messages);
sink.send_all(stream).map_err(|e| ()).wait();
But it doesn't work, because futures::stream::iter_ok isn't the correct way to construct the stream I need, which is where I'm stuck and cannot see how to proceed from the documentation.
Edit: Originally I did not specify a type annotation for the send_stream and it gave me this error:
type annotations needed for `futures::stream::iter_ok::IterOk
<std::vec::IntoIter<tungstenite::protocol::message::Message>, E>`
cannot infer type for `E`
The current error is also to do with type annotations for the send stream.
Upvotes: 2
Views: 5283
Reputation: 19662
Your problem lies squarely on this line:
let send_stream:tokio_tungstenite::WebSocketStream<tokio_tungstenite::tungstenite::Message>
= futures::stream::iter_ok(my_messages);
The definition of futures::stream::iter_ok
is as follows:
pub fn iter_ok<I, E>(i: I) -> IterOk<<I as IntoIterator>::IntoIter, E>
where
I: IntoIterator,
Notice that this new stream is an IterOk
, not a tungstenite::WebsocketStream
. By forcing the type (with your annotations), you are forcing the compiler to look for an implementation of Into<WebsocketStream>
for IterOk
, which there is none.
As a result, this fails.
This type hint was completely unnecessary as send_all
is defined for impl Sink<SinkItem = _, SinkError = _>
.
Without too much surprise, changing the line to:
let send_stream = futures::stream::iter_ok(my_messages)
.map_err(|_:()| tungstenite::error::Error::Utf8)
should work flawlessly, as your sink expects tungstenite::Message
and this new stream has that as an element (as it was built from a Vec<tungstenite::Message>
). The map_err
is there purely to coerce types and will never be called in practice.
Upvotes: 1