jack
jack

Reputation: 157

How to transform observable based on predicate involving first element

I'm trying to create an Rx.NET operator that takes an Observable<string> and:

For example:

-a-b-c-d-|- --> -a-b-c-d-|-

-b-c-d-|- --> -|-

How can I do this?

Upvotes: 1

Views: 446

Answers (2)

Enigmativity
Enigmativity

Reputation: 117084

Here's a version that definately doesn't have a race condition:

public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
    source
        .Publish(published =>
            from x in published.Take(1)
            from y in
                x.Equals(expectedFirstElement)
                ? published.StartWith(x)
                : Observable.Empty<T>()
            select y);

There's the method syntax version:

public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
    source
        .Publish(published =>
            published
                .Take(1)
                .SelectMany(x =>
                    x.Equals(expectedFirstElement)
                    ? published.StartWith(x)
                    : Observable.Empty<T>()));

I prefer the query syntax, but hey...

Upvotes: 2

Theodor Zoulias
Theodor Zoulias

Reputation: 43554

Here is one way to do it:

/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
    T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
    comparer ??= EqualityComparer<T>.Default;
    return source.Publish(published =>
        published
            .Where(x => !comparer.Equals(x, expectedFirstElement))
            .Take(1)
            .IgnoreElements()
            .Amb(published)
    );
}

This implementation uses the Amb operator (short for “ambiguous”), which takes two sequences and propagates the sequence that reacts first.

  1. If the first element has the desirable value, the first sequence (the published.Where+Take+IgnoreElements) does not react, so the second sequence is propagated (the published, which is the whole sequence). At this point the first sequence is unsubscribed, so the comparer.Equals method will not be invoked for subsequent elements.
  2. If the first element has not the desirable value, the first sequence emits a completion notification, which is propagated by the Amb operator, and the second sequence (the whole sequence) is ignored.

Usage example:

IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");

Note: This implementation is based on the assumption that when both sequences react at the same time, the Amb operator selects consistently the first sequence. This is not mentioned in the documentation, which states only that "The Amb operator uses parallel processing to detect which sequence yields the first item". The source code is quite complex, so I can't derive this guarantee by reading it. If you want something more reliable, you could try this implementation instead:

return Observable.Create<T>(observer =>
{
    bool first = true;
    return source.Subscribe(item =>
    {
        if (first)
        {
            first = false;
            if (!comparer.Equals(item, expectedFirstElement))
            {
                observer.OnCompleted(); return;
            }
        }
        observer.OnNext(item);
    }, observer.OnError, observer.OnCompleted);
});

Upvotes: 2

Related Questions