real-or-random
real-or-random

Reputation: 244

Splitting a futures::Stream into multiple streams based on a property of the stream item

I have a Stream of items (u32, Bytes) where the integer is an index in the range 0..n I would like to split this stream into n streams, basically filtering by the integer.

I considered several possibilities, including

I feel that neither of these possibilities is convincing. The first one seems to create unnecessary overhead and the second one is just not elegant (if it even works, I am not sure).

What's a good way of splitting the stream?

Upvotes: 5

Views: 2313

Answers (2)

Lukazoid
Lukazoid

Reputation: 19416

At one point I had a similar requirement and wrote a group_by operator for Stream.

I haven't yet published this to crates.io as I didn't really feel it was ready for consumption but feel free to take a look at the code at https://github.com/Lukazoid/lz_stream_tools or attempt to use it for yourself.

Add the following to your cargo.toml:

[dependencies]
lz_stream_tools = { git = "https://github.com/Lukazoid/lz_stream_tools" }

And extern crate lz_stream_tools; to your bin.rs/lib.rs.

Then from your code you may use it like so:

use lz_stream_tools::StreamTools;

let groups = some_stream.group_by(|x| x.0);

groups will now be a Stream of (u32, Stream<Item=Bytes)).

Upvotes: 3

djc
djc

Reputation: 11711

You could use channels to represent the index-specific streams. You'd have to spawn one Task that pulls from the original stream and has a map of Senders.

Upvotes: 1

Related Questions