James L
James L

Reputation: 16864

Split IObservable<T> into IObservable<IObservable<T>>

I have a source stream which sometimes emits a certain sentinel value to designate the beginning of a new stream. I'd like to convert my stream into IObservable<IObservable<T>>. Can anyone think of an elegant way?

Upvotes: 1

Views: 60

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43464

This should do the trick:

    observable = observable
        .Publish()
        .RefCount();
    var splitted = observable
        .Window(observable.Where(x => x == SENTINEL))
        .Select(c => c.Where(x => x != SENTINEL));

Complete example:

    const int SENTINEL = -1;
    var observable = Observable
        .Interval(TimeSpan.FromMilliseconds(100))
        .Select(x => x + 1)
        .Take(12)
        .Select(x => x % 5 == 0 ? SENTINEL : x) // Every fifth is a sentinel
        .Publish()
        .RefCount();
    observable
        .Window(observable.Where(x => x == SENTINEL))
        .Select(c => c.Where(x => x != SENTINEL))
        .Select((c, i) => c.Select(x => (i, x))) // Embed the index of the subsequence
        .Merge() // Merge them again
        .Do(x => Console.WriteLine($"Received: {x}"))
        .Subscribe();
    await observable.LastOrDefaultAsync(); // Wait it to end

Output:

Received: (0, 1)
Received: (0, 2)
Received: (0, 3)
Received: (0, 4)
Received: (1, 6)
Received: (1, 7)
Received: (1, 8)
Received: (1, 9)
Received: (2, 11)
Received: (2, 12)

Upvotes: 1

Related Questions