Reputation: 157
I'm trying to create an Rx.NET operator that takes an Observable<string>
and:
"a"
For example:
-a-b-c-d-|- --> -a-b-c-d-|-
-b-c-d-|- --> -|-
How can I do this?
Upvotes: 1
Views: 446
Reputation: 117084
Here's a version that definately doesn't have a race condition:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
from x in published.Take(1)
from y in
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()
select y);
There's the method syntax version:
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source, T expectedFirstElement) =>
source
.Publish(published =>
published
.Take(1)
.SelectMany(x =>
x.Equals(expectedFirstElement)
? published.StartWith(x)
: Observable.Empty<T>()));
I prefer the query syntax, but hey...
Upvotes: 2
Reputation: 43554
Here is one way to do it:
/// <summary>
/// If the first element has the expected value, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElement<T>(this IObservable<T> source,
T expectedFirstElement, IEqualityComparer<T> comparer = default)
{
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published =>
published
.Where(x => !comparer.Equals(x, expectedFirstElement))
.Take(1)
.IgnoreElements()
.Amb(published)
);
}
This implementation uses the Amb
operator (short for “ambiguous”), which takes two sequences and propagates the sequence that reacts first.
published.Where
+Take
+IgnoreElements
) does not react, so the second sequence is propagated (the published
, which is the whole sequence). At this point the first sequence is unsubscribed, so the comparer.Equals
method will not be invoked for subsequent elements.Amb
operator, and the second sequence (the whole sequence) is ignored.Usage example:
IObservable<string> original = new string[] { "a", "b", "c", "d" }.ToObservable();
IObservable<string> transformed = original.IfFirstElement("a");
Note: This implementation is based on the assumption that when both sequences react at the same time, the Amb
operator selects consistently the first sequence. This is not mentioned in the documentation, which states only that "The Amb
operator uses parallel processing to detect which sequence yields the first item". The source code is quite complex, so I can't derive this guarantee by reading it. If you want something more reliable, you could try this implementation instead:
return Observable.Create<T>(observer =>
{
bool first = true;
return source.Subscribe(item =>
{
if (first)
{
first = false;
if (!comparer.Equals(item, expectedFirstElement))
{
observer.OnCompleted(); return;
}
}
observer.OnNext(item);
}, observer.OnError, observer.OnCompleted);
});
Upvotes: 2