SuperJMN
SuperJMN

Reputation: 13972

Observable.Buffer depending on buffered items

I would like to buffer items in a sequence according to a condition. The problem is that this condition depends on the items that are processed.

Let me put an example:

Given this:

new[] { 1, 3, 5, 7, 2, 4, 6, 8, 1 };

This way, the result sequence should be:

{ 1 }
{ 3 }
{ 5 }
{ 7 }
{ 2, 4, 6, 8 }
{ 1 }

I've tried variations of this without success:

var boundaries = origin.Select(x => x % 2 != 0).DistinctUntilChanged();
var result = origin.Buffer(boundaries);

Upvotes: 1

Views: 234

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43515

This might be close to what you want. Instead of the Buffer operator it uses the GroupByUntil operator, which I consider to be more reliable.

/// <summary>
/// Splits the elements of a sequence into chunks that are starting with
/// elements that satisfy the predicate.
/// </summary>
public static IObservable<IList<TSource>> BufferByPredicate<TSource>(
    this IObservable<TSource> source,
    Predicate<TSource> startNewBufferPredicate)
{
    ArgumentNullException.ThrowIfNull(source);
    ArgumentNullException.ThrowIfNull(startNewBufferPredicate);
    return source
        .SelectMany(x =>
        {
            var subSequence = Observable.Return((Value: x, HasValue: true));
            if (startNewBufferPredicate(x))
                // Add a fake "boundary" element before the real element.
                subSequence = subSequence.Prepend((default, false));
            return subSequence;
        })
        .GroupByUntil(_ => 0, g => g.SkipWhile(e => e.HasValue))
        .SelectMany(g => g.Where(e => e.HasValue).Select(e => e.Value).ToArray())
        .Where(w => w.Length > 0);
}

Usage example:

 IObservable<IList<int>> result = origin.BufferByPredicate(x => x % 2 != 0);

Upvotes: 1

Related Questions