James L
James L

Reputation: 16864

Count number of live concurrent streams

I have a stream of streams. I'd like to know to transform this into a live count of many inner streams are currently live, i.e. not completed or errored.

How would I implement CountLiveStreams?

var source = new Subject<IObservable<Unit>>();
IObservable<int> count = source.CountLiveStreams();

Thanks!

Upvotes: 3

Views: 158

Answers (2)

Enigmativity
Enigmativity

Reputation: 117064

This works for me:

public static IObservable<int> CountLiveStreams<T>(this IObservable<IObservable<T>> source) =>
    source
        .SelectMany(xs =>
            xs
                .Materialize()
                .Where(x => x.Kind != NotificationKind.OnNext)
                .Select(x => -1)
                .StartWith(1))
        .Scan((x, y) => x + y);

It uses the same strategy as Theodor's answer of producing 1 and -1 and then using .Scan to create a running tally, but I think it's clearer with a call to .Materialize().

Upvotes: 2

Theodor Zoulias
Theodor Zoulias

Reputation: 43545

Here is an implementation of the CountLiveStreams operator:

public static IObservable<int> CountLiveStreams<T>(
    this IObservable<IObservable<T>> streamOfStreams)
{
    return streamOfStreams
        .Select(x => x.IgnoreElements().Select(_ => 0).Catch(Observable.Empty<int>())
            .Prepend(1).Append(-1))
        .Merge()
        .Scan(0, (accumulator, delta) => accumulator + delta)
        .Prepend(0);
}

Each emitted stream is converted to an IObservable<int> that emits 2 values, the value 1 at the start and -1 at the end. Then all value-pairs generated from all streams are merged into a single IObservable<int>, and finally all these numbers are accumulated using the Scan operator.

Upvotes: 5

Related Questions