David Pfeffer
David Pfeffer

Reputation: 39871

Switch to a different IObservable if the first is empty

I'm writing a function that retrieves news about a subject and feeds this news back via an IObservable return value.

However, I have several news sources. I don't want to use Merge to combine together these sources into one. Instead, what I'd like to do is order them by priority --

  1. When my function gets called, the first news source is queried (which produces an IObservable representing that source).
  2. If that news source's IObservable completes without returning any results, the next news source is queried.
  3. If that second source completes without returning results, the final news source is queried.
  4. This whole behavior is wrapped up into an observable I can return to the user.

Is this sort of behavior something I can accomplish using built-in Rx extension methods, or do I need to implement a custom class to handle this? How would I approach doing either?

Upvotes: 7

Views: 2631

Answers (5)

Theodor Zoulias
Theodor Zoulias

Reputation: 44026

Here is a non-blocking version of JerKimball's SwitchIfEmpty operator.

/// <summary>Returns the elements of the first sequence, or the elements of the
/// second sequence if the first sequence is empty.</summary>
public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> first,
    IObservable<T> second)
{
    return Observable.Defer(() =>
    {
        bool isEmpty = true;
        return first
            .Do(_ => isEmpty = false)
            .Concat(Observable.If(() => isEmpty, second));
    });
}

And here is a version of the same operator that accepts multiple sequences, and returns the elements of the first non-empty sequence:

/// <summary>Returns the elements of the first non-empty sequence.</summary>
public static IObservable<T> SwitchIfEmpty<T>(params IObservable<T>[] sequences)
{
    return Observable.Defer(() =>
    {
        bool isEmpty = true;
        return sequences
            .Select(s => s.Do(_ => isEmpty = false))
            .Select(s => Observable.If(() => isEmpty, s))
            .Concat();
    });
}

The Observable.Defer operator is used to prevent multiple subscriptions from sharing the same bool isEmpty state (more info about this here).

Upvotes: 0

Taylor Buchanan
Taylor Buchanan

Reputation: 4757

The accepted answer is undesirable in my opinion because it uses Subject, Do, and still subscribes to the second sequence when the first isn't empty. The latter can be a big problem if the second observable invokes anything nontrivial. I came up with the following solution instead:

public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> @this, IObservable<T> switchTo)
{
    if (@this == null) throw new ArgumentNullException(nameof(@this));
    if (switchTo == null) throw new ArgumentNullException(nameof(switchTo));
    return Observable.Create<T>(obs =>
    {
        var source = @this.Replay(1);
        var switched = source.Any().SelectMany(any => any ? Observable.Empty<T>() : switchTo);
        return new CompositeDisposable(source.Concat(switched).Subscribe(obs), source.Connect());
    });
}

The name SwitchIfEmpty falls in line with the existing RxJava implementation. Here is an ongoing discussion about incorporating some of the RxJava operators into RxNET.

I'm sure a custom IObservable implementation would be much more efficient than mine. You can find one here written by ReactiveX member akarnokd. It's also available on NuGet.

Upvotes: 4

Brandon
Brandon

Reputation: 39222

Edit from Original Poster:

I went with this answer, but turned this into an extension method --

/// <summary> Returns the elements of the first sequence, or the values in the second sequence if the first sequence is empty. </summary>
/// <param name="first"> The first sequence. </param>
/// <param name="second"> The second sequence. </param>
/// <typeparam name="T"> The type of elements in the sequence. </typeparam>
/// <returns> The <see cref="IObservable{T}"/> sequence. </returns>
public static IObservable<T> DefaultIfEmpty<T>(this IObservable<T> first, IObservable<T> second)
{
    var signal = new AsyncSubject<Unit>();
    var source1 = first.Do(item => { signal.OnNext(Unit.Default); signal.OnCompleted(); });
    var source2 = second.TakeUntil(signal);

    return source1.Concat(source2); // if source2 is cold, it won't invoke it until source1 is completed
}

The original answer:

This might do the trick.

var signal1 = new AsyncSubject<Unit>();
var signal2 = new AsyncSubject<Unit>();
var source1 = a.Do(item => { signal1.onNext(Unit.Default); signal1.onCompleted(); });
var source2 = b.Do(item => { signal2.onNext(Unit.Default); signal2.onCompleted(); })).TakeUntil(signal1);
var source3 = c.TakeUntil(signal2.Merge(signal1));

return Observable.Concat(source1, source2, source3);

Edit: whoops, need a separate signal for the 2nd source and the 3rd doesnt need to signal anything. Edit2: Whoops...types. I'm used to RxJs :)

P.S. there's also less RX-y ways to do it that is probably a bit less typing:

var gotResult = false;
var source1 = a();
var source2 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : b());
var source3 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : c());
return Observable.Concat(source1, source2, source3).Do(_ => gotResult = true;);

Upvotes: 1

JerKimball
JerKimball

Reputation: 16944

Another approach - fairly drastic in difference to other, so I'll spin a new answer:

Here it is with all sorts of fun debugging lines:

public static IObservable<T> FirstWithValues<T>(this IEnumerable<IObservable<T>> sources)
{
    return Observable.Create<T>(obs =>
    {
        // these are neat - if you set it's .Disposable field, and it already
        // had one in there, it'll auto-dispose it
        SerialDisposable disp = new SerialDisposable();
        // this will trigger our exit condition
        bool hadValues = false;
        // start on the first source (assumed to be in order of importance)
        var sourceWalker = sources.GetEnumerator();
        sourceWalker.MoveNext();

        IObserver<T> checker = null;
        checker = Observer.Create<T>(v => 
            {
                // Hey, we got a value - pass to the "real" observer and note we 
                // got values on the current source
                Console.WriteLine("Got value on source:" + v.ToString());
                hadValues = true;
                obs.OnNext(v);
            },
            ex => {
                // pass any errors immediately back to the real observer
                Console.WriteLine("Error on source, passing to observer");
                obs.OnError(ex);
            },
            () => {
                // A source completed; if it generated any values, we're done;                    
                if(hadValues)
                {
                    Console.WriteLine("Source completed, had values, so ending");
                    obs.OnCompleted();
                }
                // Otherwise, we need to check the next source in line...
                else
                {
                    Console.WriteLine("Source completed, no values, so moving to next source");
                    sourceWalker.MoveNext();
                    disp.Disposable = sourceWalker.Current.Subscribe(checker);
                }
            });
        // kick it off by subscribing our..."walker?" to the first source
        disp.Disposable = sourceWalker.Current.Subscribe(checker);
        return disp.Disposable;
    });
}

Usage:

var query = new[]
{
    Observable.Defer(() => GetSource("A")), 
    Observable.Defer(() => GetSource("B")), 
    Observable.Defer(() => GetSource("C")), 
}.FirstWithValues();

Output:

Source A invoked
Got value on source:Article from A
Article from A
Source completed, had values, so ending

Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Got value on source:Article from B
Article from B
Source completed, had values, so ending

Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Source completed, no values, so moving to next source
Source C invoked
Got value on source:Article from C
Article from C
Source completed, had values, so ending

Upvotes: 1

JerKimball
JerKimball

Reputation: 16944

It sounds like you can just use a plain-old Amb query.

EDIT: based on comment, Amb won't do it - give this a whack:

public static IObservable<T> SwitchIfEmpty<T>(
     this IObservable<T> first, 
     Func<IObservable<T>> second)
{
    return first.IsEmpty().FirstOrDefault() ? second() : first;
}

Test rig:

static Random r = new Random();
public IObservable<string> GetSource(string sourceName)
{
    Console.WriteLine("Source {0} invoked", sourceName);
    return r.Next(0, 10) < 5 
        ? Observable.Empty<string>() 
        : Observable.Return("Article from " + sourceName);
}

void Main()
{
    var query = GetSource("A")
        .SwitchIfEmpty(() => GetSource("B"))
        .SwitchIfEmpty(() => GetSource("C"));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
    }           
}

Some example runs:

Source A invoked
Article from A

Source A invoked
Source B invoked
Article from B

Source A invoked
Source B invoked
Source C invoked
Article from C

EDITEDIT:

You could also generalize it to this, I suppose:

public static IObservable<T> SwitchIf<T>(
    this IObservable<T> first, 
    Func<IObservable<T>, IObservable<bool>> predicate, 
    Func<IObservable<T>> second)
{
    return predicate(first).FirstOrDefault() 
        ? second() 
        : first;
}

Upvotes: 1

Related Questions