Benjamin Reed
Benjamin Reed

Reputation: 433

How do I send a list of items through a futures::Sink?

I have a list of items to send through a futures::Sink:

let mut list = VecDeque::new();
/* add a bunch of Packet items to list */
let (sink, stream) = tcp_stream.framed(PacketCodec).split();

I can send one packet using

if let Some(first) = list.pop_front() {
    sink.send(first);
}

How do I send the whole list?

Upvotes: 4

Views: 6168

Answers (1)

Shepmaster
Shepmaster

Reputation: 430673

When using a new piece of software, I find it very useful to read the documentation before diving in too deep. The Rust community has, in general, provided pretty great resources.

For example, the Tokio project has an entire site of documentation which includes a selection of working examples. The generated API documentation for Sink is also invaluable here.

Another thing that I recommend all programmers learn to do is to create a MCVE of the problem. This allows them to focus on the core of the problem while also removing cruft and honing the words behind the problem. Here's one for this case:

use futures::{Sink, SinkExt}; // 0.3.4

fn thing(mut sink: impl Sink<i32>) {
    let mut all_the_things = vec![1, 2, 3, 4, 5];

    while let Some(v) = all_the_things.pop() {
        sink.send(v);
    }
}

We get the error message

error[E0277]: the trait bound `impl Sink<i32>: std::marker::Unpin` is not satisfied
 --> src/lib.rs:7:14
  |
3 | fn thing(mut sink: impl Sink<i32>) {
  |                    -------------- help: consider further restricting this bound: `impl Sink<i32> + std::marker::Unpin`
...
7 |         sink.send(v);
  |              ^^^^ the trait `std::marker::Unpin` is not implemented for `impl Sink<i32>`

Let's turn back to the API documentation for SinkExt::send...

fn send(&mut self, item: Item) -> Send<Self, Item>
where
    Self: Unpin, 

From this, we can see that send requires that the type implement Unpin, so let's change that:

use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;

fn thing(mut sink: impl Sink<i32> + Unpin) {
    let mut all_the_things = vec![1, 2, 3, 4, 5];

    while let Some(v) = all_the_things.pop() {
        sink.send(v);
    }
}

This compiles, but has a warning:

warning: unused `futures_util::sink::send::Send` that must be used
 --> src/lib.rs:8:9
  |
8 |         sink.send(v);
  |         ^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

Reviewing the API again, we see it returns a Send value. That's strange, isn't it? It's actually not that strange, once you see that Send implements Future - pushing something into the stream might block if the stream is full; this is the concept of back pressure. Having the result be a Future is how you can know when the item has actually been added to the stream.

One solution is to drive that future to completion with .await and making out function async:

use futures::{Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;

async fn thing(mut sink: impl Sink<i32> + Unpin) {
    let mut all_the_things = vec![1, 2, 3, 4, 5];

    while let Some(v) = all_the_things.pop() {
        sink.send(v)
            .await
            .unwrap_or_else(|_| panic!("Unable to send item"));
    }
}

This compiles, but isn't super pretty. Turning back to the documentation, we can also see Sink::send_all, which takes a TryStream. We can create a Stream from an iterator by using stream::iter and driving the future returned by send_all to completion:

use futures::{stream, Sink, SinkExt}; // 0.3.4
use std::marker::Unpin;

async fn thing(mut sink: impl Sink<i32> + Unpin) {
    let all_the_things = vec![1, 2, 3, 4, 5];
    let mut stream = stream::iter(all_the_things.into_iter().map(Ok));

    sink.send_all(&mut stream)
        .await
        .unwrap_or_else(|_| panic!("Unable to send item"));
}

Upvotes: 21

Related Questions