Reputation: 16864
I have a stream of streams. I'd like to know to transform this into a live count of many inner streams are currently live, i.e. not completed or errored.
How would I implement CountLiveStreams?
var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();
Thanks!
Upvotes: 3
Views: 158
Reputation: 117064
This works for me:
public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
source
.SelectMany(xs =>
xs
.Materialize()
.Where(x => x.Kind != NotificationKind.OnNext)
.Select(x => -1)
.StartWith(1))
.Scan((x, y) => x + y);
It uses the same strategy as Theodor's answer of producing 1
and -1
and then using .Scan
to create a running tally, but I think it's clearer with a call to .Materialize()
.
Upvotes: 2
Reputation: 43545
Here is an implementation of the CountLiveStreams
operator:
public static IObservable<int> CountLiveStreams<T>(
this IObservable<IObservable<T>> streamOfStreams)
{
return streamOfStreams
.Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
.Prepend(1).Append(-1))
.Merge()
.Scan(0, (accumulator, delta) => accumulator + delta)
.Prepend(0);
}
Each emitted stream is converted to an IObservable<int>
that emits 2 values, the value 1 at the start and -1 at the end. Then all value-pairs generated from all streams are merged into a single IObservable<int>
, and finally all these numbers are accumulated using the Scan
operator.
Upvotes: 5