Reputation: 77
I have an observable sequence that should be filtered using the Where
operator based on some criteria, and should populate a list with those filtered elements.
I want to change the filtering criteria on the fly, so when this happens, the list should be cleared, the previous elements of the observable sequence should be evaluated for the new criteria, as well as new elements produced by the sequence.
The list should be repopulated with past elements which now passes the new criteria, continuing with new elements (filtered as well). I don't care that new elements might be delayed, while previous elements are re-evaluated, but i do care about order.
Is this something that can be done with Reactive Extensions?
Upvotes: 1
Views: 483
Reputation: 117029
Here's a nice extension method that does what you need:
public static IObservable<T> Refilterable<T>(
this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
return
Observable
.Create<T>(o =>
{
var replay = new ReplaySubject<T>();
var replaySubscription = source.Subscribe(replay);
var query = filters.Select(f => replay.Where(f)).Switch();
var querySubscription = query.Subscribe(o);
return new CompositeDisposable(replaySubscription, querySubscription);
});
}
I tested this with this code:
var source = new Subject<int>();
var filters = new Subject<Func<int, bool>>();
var subscription = source.Refilterable(filters).Subscribe(x => Console.WriteLine(x));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
filters.OnNext(x => x % 2 == 0);
source.OnNext(4);
source.OnNext(5);
filters.OnNext(x => x % 2 == 1);
source.OnNext(6);
filters.OnNext(x => x % 3 == 0);
source.OnNext(7);
filters.OnNext(x => x % 2 == 1);
subscription.Dispose();
filters.OnNext(x => x % 2 == 0);
I got this output:
2 4 1 3 5 3 6 1 3 5 7
Which seems to be what you're after.
I just noticed the requirement to produce lists. Here's an update:
public static IObservable<IList<T>> Refilterable<T>(this IObservable<T> source, IObservable<Func<T, bool>> filters)
{
return
Observable
.Create<IList<T>>(o =>
{
var replay = new ReplaySubject<T>();
var replaySubscription = source.Subscribe(replay);
var query =
filters
.Select(f =>
replay
.Synchronize()
.Where(f)
.Scan(new List<T>(), (a, x) =>
{
a.Add(x);
return a;
}))
.Switch();
var querySubscription = query.Subscribe(o);
return new CompositeDisposable(replaySubscription, querySubscription);
});
}
The only other thing I noticed was the VB.NET tag. I'll see if I can convert later if need be.
This should be the right VB:
<System.Runtime.CompilerServices.Extension> _
Public Shared Function Refilterable(Of T)(source As IObservable(Of T), filters As IObservable(Of Func(Of T, Boolean))) As IObservable(Of IList(Of T))
Return Observable.Create(Of IList(Of T))( _
Function(o)
Dim replay = New ReplaySubject(Of T)()
Dim replaySubscription = source.Subscribe(replay)
Dim query = filters.[Select](Function(f) replay.Synchronize().Where(f).Scan(New List(Of T)(), _
Function(a, x)
a.Add(x)
Return a
End Function)).Switch()
Dim querySubscription = query.Subscribe(o)
Return New CompositeDisposable(replaySubscription, querySubscription)
End Function)
End Function
Upvotes: 3
Reputation: 10783
I have two initial reactions
Replay()
that is resubscribed to when the filter condition changesTo address my first thought, you could just have this code
public class Foo
{
private readonly IDisposable _replayConnection;
private readonly IConnectableObservable<int> _replaySource;
private readonly SerialDisposable _subscription = new SerialDisposable();
private readonly List<int> _values = new List<int>();
//the Ctor or some initialise method...
public Foo(IObservable<int> source)
{
_replaySource = source.Replay();
_replayConnection = _replaySource.Connect()
}
public void SetFilter(Func<int, bool> predicate)
{
//Not thread safe. If required, then a scheduler can solve that.
_values.Clear();
_subscription.Disposable = _replaySource.Where(predicate)
.Subscribe(value => _values.Add(value),
ex => {/*Do something here to handle errors*/},
() => {/*Do something here if you want to cater for completion of the sequence*/},
}
}
However, this just makes me more concerned about point 2. If you are expecting millions of values, then if they are just ints, then it is about 3MB/1mil items of memory you will be using. If they are ints, then copy by value semantics, you will get copies in the Replay buffer and in your final list (IIRC). If this kind of memory pressure is ok, then I think the Replay
code above will be fine. Also note that this Replay usage will throw if you try to buffer more than int.MaxValue
values.
Upvotes: 1
Reputation: 3488
Unless I missed something crucial, the specified requirements eliminate the need for immutable collections and simplify the implementation because you need to buffer all emitted values.
private List<T> values = new List<T>();
private IObservable<T> _valueSource;
public List<T> ValidValues => values.Where(MatchesCriteria).ToList();
private void StartSubscriptions()
{
var addNewValuesSub = _valueSource.Subscribe(values.Add); //todo disposing
}
If you believe that IEnumerable.Where
is too slow and we know all possible criterias beforehand. We could GroupBy
separate values to their respective Observable
s / data structures. It would look something like this.
_valueSource.GroupBy(CriteriaSelector)
.Subscribe(i => UpdateDataStructure(i.Key(), i.Latest()) );
IObservable
is not good for buffering lots of values which should later be accessed by some criteria. That's a job for IEnumerable
.
Finally, if you think memory usage will be an issue consider refactoring values
as a memory object caching system.
Upvotes: 1