Reputation: 39871
I'm writing a function that retrieves news about a subject and feeds this news back via an IObservable return value.
However, I have several news sources. I don't want to use Merge
to combine together these sources into one. Instead, what I'd like to do is order them by priority --
Is this sort of behavior something I can accomplish using built-in Rx extension methods, or do I need to implement a custom class to handle this? How would I approach doing either?
Upvotes: 7
Views: 2631
Reputation: 44026
Here is a non-blocking version of JerKimball's SwitchIfEmpty
operator.
/// <summary>Returns the elements of the first sequence, or the elements of the
/// second sequence if the first sequence is empty.</summary>
public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> first,
IObservable<T> second)
{
return Observable.Defer(() =>
{
bool isEmpty = true;
return first
.Do(_ => isEmpty = false)
.Concat(Observable.If(() => isEmpty, second));
});
}
And here is a version of the same operator that accepts multiple sequences, and returns the elements of the first non-empty sequence:
/// <summary>Returns the elements of the first non-empty sequence.</summary>
public static IObservable<T> SwitchIfEmpty<T>(params IObservable<T>[] sequences)
{
return Observable.Defer(() =>
{
bool isEmpty = true;
return sequences
.Select(s => s.Do(_ => isEmpty = false))
.Select(s => Observable.If(() => isEmpty, s))
.Concat();
});
}
The Observable.Defer
operator is used to prevent multiple subscriptions from sharing the same bool isEmpty
state (more info about this here).
Upvotes: 0
Reputation: 4757
The accepted answer is undesirable in my opinion because it uses Subject
, Do
, and still subscribes to the second sequence when the first isn't empty. The latter can be a big problem if the second observable invokes anything nontrivial. I came up with the following solution instead:
public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> @this, IObservable<T> switchTo)
{
if (@this == null) throw new ArgumentNullException(nameof(@this));
if (switchTo == null) throw new ArgumentNullException(nameof(switchTo));
return Observable.Create<T>(obs =>
{
var source = @this.Replay(1);
var switched = source.Any().SelectMany(any => any ? Observable.Empty<T>() : switchTo);
return new CompositeDisposable(source.Concat(switched).Subscribe(obs), source.Connect());
});
}
The name SwitchIfEmpty
falls in line with the existing RxJava implementation. Here is an ongoing discussion about incorporating some of the RxJava operators into RxNET.
I'm sure a custom IObservable
implementation would be much more efficient than mine. You can find one here written by ReactiveX member akarnokd. It's also available on NuGet.
Upvotes: 4
Reputation: 39222
I went with this answer, but turned this into an extension method --
/// <summary> Returns the elements of the first sequence, or the values in the second sequence if the first sequence is empty. </summary>
/// <param name="first"> The first sequence. </param>
/// <param name="second"> The second sequence. </param>
/// <typeparam name="T"> The type of elements in the sequence. </typeparam>
/// <returns> The <see cref="IObservable{T}"/> sequence. </returns>
public static IObservable<T> DefaultIfEmpty<T>(this IObservable<T> first, IObservable<T> second)
{
var signal = new AsyncSubject<Unit>();
var source1 = first.Do(item => { signal.OnNext(Unit.Default); signal.OnCompleted(); });
var source2 = second.TakeUntil(signal);
return source1.Concat(source2); // if source2 is cold, it won't invoke it until source1 is completed
}
This might do the trick.
var signal1 = new AsyncSubject<Unit>();
var signal2 = new AsyncSubject<Unit>();
var source1 = a.Do(item => { signal1.onNext(Unit.Default); signal1.onCompleted(); });
var source2 = b.Do(item => { signal2.onNext(Unit.Default); signal2.onCompleted(); })).TakeUntil(signal1);
var source3 = c.TakeUntil(signal2.Merge(signal1));
return Observable.Concat(source1, source2, source3);
Edit: whoops, need a separate signal for the 2nd source and the 3rd doesnt need to signal anything. Edit2: Whoops...types. I'm used to RxJs :)
P.S. there's also less RX-y ways to do it that is probably a bit less typing:
var gotResult = false;
var source1 = a();
var source2 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : b());
var source3 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : c());
return Observable.Concat(source1, source2, source3).Do(_ => gotResult = true;);
Upvotes: 1
Reputation: 16944
Another approach - fairly drastic in difference to other, so I'll spin a new answer:
Here it is with all sorts of fun debugging lines:
public static IObservable<T> FirstWithValues<T>(this IEnumerable<IObservable<T>> sources)
{
return Observable.Create<T>(obs =>
{
// these are neat - if you set it's .Disposable field, and it already
// had one in there, it'll auto-dispose it
SerialDisposable disp = new SerialDisposable();
// this will trigger our exit condition
bool hadValues = false;
// start on the first source (assumed to be in order of importance)
var sourceWalker = sources.GetEnumerator();
sourceWalker.MoveNext();
IObserver<T> checker = null;
checker = Observer.Create<T>(v =>
{
// Hey, we got a value - pass to the "real" observer and note we
// got values on the current source
Console.WriteLine("Got value on source:" + v.ToString());
hadValues = true;
obs.OnNext(v);
},
ex => {
// pass any errors immediately back to the real observer
Console.WriteLine("Error on source, passing to observer");
obs.OnError(ex);
},
() => {
// A source completed; if it generated any values, we're done;
if(hadValues)
{
Console.WriteLine("Source completed, had values, so ending");
obs.OnCompleted();
}
// Otherwise, we need to check the next source in line...
else
{
Console.WriteLine("Source completed, no values, so moving to next source");
sourceWalker.MoveNext();
disp.Disposable = sourceWalker.Current.Subscribe(checker);
}
});
// kick it off by subscribing our..."walker?" to the first source
disp.Disposable = sourceWalker.Current.Subscribe(checker);
return disp.Disposable;
});
}
Usage:
var query = new[]
{
Observable.Defer(() => GetSource("A")),
Observable.Defer(() => GetSource("B")),
Observable.Defer(() => GetSource("C")),
}.FirstWithValues();
Output:
Source A invoked
Got value on source:Article from A
Article from A
Source completed, had values, so ending
Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Got value on source:Article from B
Article from B
Source completed, had values, so ending
Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Source completed, no values, so moving to next source
Source C invoked
Got value on source:Article from C
Article from C
Source completed, had values, so ending
Upvotes: 1
Reputation: 16944
It sounds like you can just use a plain-old Amb
query.
EDIT: based on comment, Amb
won't do it - give this a whack:
public static IObservable<T> SwitchIfEmpty<T>(
this IObservable<T> first,
Func<IObservable<T>> second)
{
return first.IsEmpty().FirstOrDefault() ? second() : first;
}
Test rig:
static Random r = new Random();
public IObservable<string> GetSource(string sourceName)
{
Console.WriteLine("Source {0} invoked", sourceName);
return r.Next(0, 10) < 5
? Observable.Empty<string>()
: Observable.Return("Article from " + sourceName);
}
void Main()
{
var query = GetSource("A")
.SwitchIfEmpty(() => GetSource("B"))
.SwitchIfEmpty(() => GetSource("C"));
using(query.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
}
Some example runs:
Source A invoked
Article from A
Source A invoked
Source B invoked
Article from B
Source A invoked
Source B invoked
Source C invoked
Article from C
EDITEDIT:
You could also generalize it to this, I suppose:
public static IObservable<T> SwitchIf<T>(
this IObservable<T> first,
Func<IObservable<T>, IObservable<bool>> predicate,
Func<IObservable<T>> second)
{
return predicate(first).FirstOrDefault()
? second()
: first;
}
Upvotes: 1