Reputation: 1595
I need to configure a list of filters on a stream.
The stream initially doesn't have any filter, but during time it has a list of active filters.
Each filter will have a strict validity period.
A test case follows:
var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0200.Ms(), 'A'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0400.Ms(), 'B'),
ReactiveTest.OnNext(0500.Ms(), 'A'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C') );
// filters
// A between 70ms -> 550ms
// B between 330ms -> 400ms
// modeled as a string observable where:
// first word is the char to filter
// second word are the msecs duration of the filter
var filters = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(0070.Ms(), "A 480"),
ReactiveTest.OnNext(0330.Ms(), "B 70")
);
var expected = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C') );
Can you suggest me which is the best Rx-solution to do this?
p.s.: I am using the following extension method
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
Upvotes: 0
Views: 110
Reputation: 14350
First, your input should look like this:
var input = scheduler.CreateColdObservable<char>(
ReactiveTest.OnNext(0100.Ms(), '1'),
ReactiveTest.OnNext(0200.Ms(), 'A'),
ReactiveTest.OnNext(0300.Ms(), '2'),
ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line
ReactiveTest.OnNext(0500.Ms(), 'A'),
ReactiveTest.OnNext(0600.Ms(), 'B'),
ReactiveTest.OnNext(0700.Ms(), '5'),
ReactiveTest.OnNext(0800.Ms(), 'A'),
ReactiveTest.OnNext(0900.Ms(), 'C'));
Also, as @LeeCampbell mentioned, your Ms
extension method should return type long
, not TimeSpan
.
Here's the solution I came up with:
var activeFilters = filters
.Select(s => s.Split(' '))
.Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1]))))
.Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1))
.MergeCombineLatest(true)
.StartWith(new List<char>());
var output = activeFilters.Publish(_activeFilters =>
input.Join(_activeFilters,
_ => Observable.Return(1),
t => _activeFilters,
(c, filterList) => Tuple.Create(c, filterList)
)
)
.Where(t => !t.Item2.Contains(t.Item1))
.Select(t => t.Item1);
var observer = scheduler.CreateObserver<char>();
output.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
expected.Messages,
observer.Messages);
activeFilters
is an observable that emits a list of characters currently under filter. When the actively-filtered list changes, activeFilters
emits a new List. Note that since the same character can have multiple filters, the list is not necessarily unique.
Once you can figure out at any given time which filters are active, you can join that list to the input.
The code requires this extension method, which uses the System.Collections.Immutable Nuget package:
public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted)
{
return outer
.SelectMany((inner, i) => inner
.Materialize()
.SelectMany(nt => nt.Kind == NotificationKind.OnNext
? Observable.Return(Tuple.Create(i, nt.Value, true))
: nt.Kind == NotificationKind.OnCompleted
? removeCompleted
? Observable.Return(Tuple.Create(i, default(T), false))
: Observable.Empty<Tuple<int, T, bool>>()
: Observable.Throw<Tuple<int, T, bool>>(nt.Exception)
)
)
.Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1))
.Select(dict => dict.Values.ToList());
}
MergeCombineLatest
takes an observable of observables and emits a list of the latest values from each of the child observables. If the removeCompleted
is true, then when a child observable completes, the list shrinks by one. If removeCompleted
is false, then the last value stays in the list and successive lists forever.
If there's some friendlier way of doing this, I would be much obliged.
Upvotes: 1