Reputation: 244
I have a Stream
of items (u32, Bytes)
where the integer is an index in the range 0..n
I would like to split this stream into n
streams, basically filtering by the integer.
I considered several possibilities, including
n
streams each of which peeks at the underlying stream to determine if the next item is for itn
sinks when they arrive, and then use the other side of the sink as a stream again. (This seems to be related to
Forwarding from a futures::Stream to a futures::Sink.).I feel that neither of these possibilities is convincing. The first one seems to create unnecessary overhead and the second one is just not elegant (if it even works, I am not sure).
What's a good way of splitting the stream?
Upvotes: 5
Views: 2313
Reputation: 19416
At one point I had a similar requirement and wrote a group_by
operator for Stream
.
I haven't yet published this to crates.io as I didn't really feel it was ready for consumption but feel free to take a look at the code at https://github.com/Lukazoid/lz_stream_tools or attempt to use it for yourself.
Add the following to your cargo.toml:
[dependencies]
lz_stream_tools = { git = "https://github.com/Lukazoid/lz_stream_tools" }
And extern crate lz_stream_tools;
to your bin.rs/lib.rs.
Then from your code you may use it like so:
use lz_stream_tools::StreamTools;
let groups = some_stream.group_by(|x| x.0);
groups
will now be a Stream
of (u32, Stream<Item=Bytes))
.
Upvotes: 3