Reputation: 5067
Observable.TakeWhile allows you to run a sequence as long as a condition is true (using a delegate so we can perform computations on the actual sequence objects), but it's checking this condition BEFORE each element. How can I perform the same check but AFTER each element?
The following code demonstrates the problem
void RunIt()
{
List<SomeCommand> listOfCommands = new List<SomeCommand>();
listOfCommands.Add(new SomeCommand { CurrentIndex = 1, TotalCount = 3 });
listOfCommands.Add(new SomeCommand { CurrentIndex = 2, TotalCount = 3 });
listOfCommands.Add(new SomeCommand { CurrentIndex = 3, TotalCount = 3 });
var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex != c.TotalCount);
obs.Subscribe(x =>
{
Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
});
}
class SomeCommand
{
public int CurrentIndex;
public int TotalCount;
}
This outputs
1 of 3
2 of 3
I can't get the third element
Looking at this example, you may think all I have to do is change my condition like so -
var obs = listOfCommands.ToObservable().TakeWhile(c => c.CurrentIndex <= c.TotalCount);
But then the observable will never complete (because in my real world code, the stream doesn't end after those three commands)
Upvotes: 8
Views: 7751
Reputation: 319
Perhaps the following way will be useful to someone. You must use the "Do" method and the empty "Subscribe" method.
listOfCommands.ToObservable()
.Do(x =>
{
Debug.WriteLine("{0} of {1}", x.CurrentIndex, x.TotalCount);
})
.TakeWhile(c => c.CurrentIndex != c.TotalCount)
.Subscribe();
This way you get the result without writing your own extensions.
Upvotes: -1
Reputation: 1960
Combo, using a new SkipUntil
and TakeUntil
:
SkipUntil
return source.Publish(s => s.SkipUntil(s.Where(predicate)));
TakeUntil (inclusive)
return source.Publish(s => s.TakeUntil(s.SkipUntil(predicate)));
Full source: https://gist.github.com/GeorgeTsiokos/a4985b812c4048c428a981468a965a86
Upvotes: 0
Reputation: 5067
Final edit:
I based my solution off of Sergey's TakeWhileInclusive implementation in this thread - How to complete a Rx Observable depending on a condition in a event
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, Func<TSource, bool> predicate)
{
return Observable
.Create<TSource>(o => source.Subscribe(x =>
{
o.OnNext(x);
if (predicate(x))
o.OnCompleted();
},
o.OnError,
o.OnCompleted
));
}
Upvotes: 6
Reputation: 84734
There's no built in operators to do what you're asking, but here's one that uses Publish
to run two queries while only subscribing to the underlying observable once:
// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
this IObservable<T> source, Func<T, bool> predicate)
{
return source.Publish(co => co.TakeWhile(predicate)
.Merge(co.SkipWhile(predicate).Take(1)));
}
And then:
var obs = listOfCommands.ToObservable()
.TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
Upvotes: 15
Reputation: 7919
You can use the TakeUntil
operator to take every item until a secondary source produces a value; in this case we can specify the second stream to be the first value after the predicate passes:
public static IObservable<TSource> TakeWhileInclusive<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1));
}
Upvotes: 2
Reputation: 16894
I think you're after TakeWhile
, not TakeUntil
:
var list = (new List<int>(){1,2,3,4,5,6,7,8,9,10});
var takeWhile = list
.ToObservable()
.Select((_, i) => Tuple.Create(i, _))
.TakeWhile(tup => tup.Item1 < list.Count)
.Do(_ => Console.WriteLine("Outputting {0}", _.Item2));
Ok, the thing you want doesn't exist out of the box, at least I'm not aware of something with that particular syntax. That said, you can cobble it together fairly easily (and it's not too nasty):
var fakeCmds = Enumerable
.Range(1, 100)
.Select(i => new SomeCommand() {CurrentIndex = i, TotalCount = 10})
.ToObservable();
var beforeMatch = fakeCmds
.TakeWhile(c => c.CurrentIndex != c.TotalCount);
var theMatch = fakeCmds
.SkipWhile(c => c.CurrentIndex != c.TotalCount)
.TakeWhile(c => c.CurrentIndex == c.TotalCount);
var upToAndIncluding = Observable.Concat(beforeMatch, theMatch);
Upvotes: 1