Reputation: 15715
When I don't know how to do generate some enumerable using LINQ, I just create my own extension method and use the yield
keyword. This gives me a closure where I can store things such as a counter or some other aggregate values.
In an IObservable
I don't know if there is a built-in way to do this. I recently wanted to generate an observable which, given another IObservable<string>
, ignores everything until a starting value (say "start") appears in the source, and then starts ignoring everything until the source yields an ending value (say "end").
So if, for instance my source was {"1", "start", "2", "3", "end", "4", "start", "5", "end}
, the new observable should be {"2", "3" "5"}
.
There may be a way to do this using IObservable's built-int methods, but it'd be straightforward to do using the yield keyword if it were an IEnumerable
. So I wonder if there is a similar straightforward way to do it for an IObservable.
I came up with this small class that basically does the job:
public class ClosureSelectMany<TSource, TResult> : IObservable<TResult>
{
private readonly IObservable<TSource> _Source;
private readonly Func<Func<TSource, IObservable<TResult>>> _Selector;
public ClosureSelectMany(IObservable<TSource> source, Func<Func<TSource, IObservable<TResult>>> selector)
{
_Source = source;
_Selector = selector;
}
public IDisposable Subscribe(IObserver<TResult> observer)
{
var selector = _Selector();
return _Source.SelectMany(selector).Subscribe(observer);
}
}
public static class ObservableHelpers
{
public static IObservable<TResult> ClosureSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<Func<TSource, IObservable<TResult>>> selector)
{
return new ClosureSelectMany<TSource, TResult>(source, selector);
}
}
I can then use it like:
test = input.ClosureSelectMany<string, string>(() =>
{
bool running = false;
return val =>
{
if (val == "end")
running = false;
var result = running ? Observable.Return(val) : Observable.Empty<string>();
if (val == "start")
running = true;
return result;
};
});
Which looks like what I'd do if I'd use yield keyword in an IEnumerable.
So I wonder if I'm reinventing the wheel and there's already some built-in function that does this, and if not why. Perhaps this method could can cause any other problem I'm not seeing right now.
Upvotes: 1
Views: 1180
Reputation: 322
First I would like to clarify that the focus here is in the alternative to yield return.
Second, this particular example problem could be resolved as this:
bool running = false;
Observable.ToObservable(new [] {"1", "start", "2", "3", "end", "4", "start", "5", "end"})
.Where(s => {
if (s == "start") running = true;
if (s == "end") running = false;
return (running && s != "start");
})
.Dump();
Now, to the answer:
You would need to remember that yield return is just syntax sugar that saves you from having to implement IEnumerable and IEnumerator yourself.
You could think of it as a combination of IEnumerator.MoveNext() and IEnumerator.Current, without the need of an extra class to save the state of the enumerator, as explained in MSDN docs (https://msdn.microsoft.com/en-us/library/9k7k7cf0.aspx).
You may also have read that the IObservable is the asynchronous/push "dual" to the synchronous/pull IEnumerable (http://reactivex.io/intro.html).
With all that in mind, you could think that the IObservable equivalent to yield return (IEnumerator.MoveNext() + IEnumerator.Current) is the IObserver.OnNext(T value) method.
Thinking that way, you could easily implement what you need as in this sample (tested in LINQPad):
void Main()
{
var q = FilteredSource.Generate();
q.Dump();
}
public class FilteredSource
{
public static IObservable<string> Generate()
{
var q = from s in OriginalSource.Generate()
select s;
return Observable.Create<string>(
async observer =>
{
bool produce = false;
try
{
await q.ForEachAsync(s => {
if (s == "start")
produce = true;
if (s == "end")
produce = false;
if (produce && s != "start")
observer.OnNext(s);
});
}
catch (Exception ex)
{
observer.OnError(ex);
}
observer.OnCompleted();
return Disposable.Empty;
});
}
}
public class OriginalSource
{
public static IObservable<string> Generate()
{
return Observable.Create<string>(
observer =>
{
try
{
string[] list = {"1", "start", "2", "3", "end", "4", "start", "5", "end"};
foreach (string s in list)
observer.OnNext(s);
}
catch (Exception ex)
{
observer.OnError(ex);
}
observer.OnCompleted();
return Disposable.Empty;
});
}
}
Upvotes: 1
Reputation: 117027
There is no yield return
equivalent for observables. It would certainly be something that could be baked in to the compiler, but it's just not available at this point in time.
However, with the available operators it is quite easy to do what you want.
This worked for me:
var results =
source
.Publish(ss =>
ss
.Window(
ss.Where(s0 => s0 == "start"),
s => ss.Where(s1 => s1 == "end"))
.Select(xs => xs.Skip(1).SkipLast(1))
.Merge());
Given this source:
var source = new []
{
"1", "start", "2", "3", "end", "4", "start", "5", "end"
}.ToObservable();
I get this output:
2
3
5
Upvotes: 2
Reputation: 2652
You can use Observable.Create
instead of composing existing operators together, though composition is generally a much better idea whenever possible.
Note that Create
also has overloads that accept Task
-returning functions, allowing you to define an async iterator -- a coroutine that uses await instead of yield, although that's not what you need here in particular.
However, your problem can be solved by composing Scan
, Where
and Select
:
xs.Scan(
new { Running = false, Value = default(string) },
(previous, current) => new
{
Running = current == "start" || (previous.Running && current != "end"),
Value = current
})
.Where(state => state.Running && state.Value != "start")
.Select(state => state.Value);
Upvotes: 2