Reputation: 1699
I'm learning rx in .NET and i've got the following requirements:
So given the example data:
using System;
using System.Reactive.Linq;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
var results = new[]
{
"right", //1
"left", //2
"right", //3
"right", //4
"left", //5
"right", //6
"right", //7
"right", //8
"left" //9
};
var observable = Observable.Generate(0,
x => x < results.Length,
x => x + 1,
x => results[x],
x =>
TimeSpan.FromSeconds(1)).Timestamp();
observable.Subscribe(...);
Console.ReadKey();
}
}
}
The result should be:
right //1
left //2
right //3
left //5
right //6
right //8
left //9
String 4 has been skipped because its only 1 second to the last "right", so has string 7. However, string 8 has not been skipped cause there are 2 seconds to string 6.
Possible solutions:
I tried to use a windowing function to skip entries, but this will skipp all strings even if they aren't the same value:
observable.Publish(ps =>
ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2)))
.SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value));
I also tried to add timestamps to each value and check them in a DistinctUntilChanged() EqualityComparer, but this seems also not to work as expected.
Upvotes: 3
Views: 1558
Reputation: 14350
This is trickier than I thought because of the triple case (right, right, right all one second apart). Using a straight .Zip
won't work here.
This is similar to Sentinel's answer, and correctly handles the triple case:
source
.Timestamp()
.Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value
? item
: state
)
.DistinctUntilChanged()
.Select(t => t.Value);
Explanation:
.Timestamp()
wraps each message with the timestamp it arrives in.Scan(1 arg)
if a duplicate-within-2-seconds comes, then it repeats the previous message, otherwise emit new message.DistinctUntilChanged()
strips out duplicate messages, which will occur because .Scan
is emitting the old message twice.Select
removes the timestamp.Upvotes: 2
Reputation: 3697
I have not tested this code, but you get the general idea.
source
.Select(x => (str: x, timestamp: DateTime.Now))
.Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)),
(state, val) =>
(last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp))
)
.Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2))
.Select(x=>x.next.str);
Upvotes: 2
Reputation: 33506
Hm.. sounds something like observable.DistinctUntilChanged
to detect distinct events, but merged via CombineLatest
with observable.Debounce
to also get the repeated-after-some-time....
That would cover the basics, but I'm not sure what would happend if a different-than-previous item came after time-longer-than-debounce.. both source DistinctUntilChanged and Debounce operators would pass the item "at the same time" and I'm not sure what would CombineLatest do at this point. There's a change you will get such event twice (the same event in a very short period of time), so you'd need to deduplicate it out once more.
If you are willing to add some kind of timestamps, then there's also pretty explicit way to do it, although I'm not sure if it's really easier..
You should be able to implement the magic ObjectThatIsAlwaysDifferent by simply making a class whole gethashcode returns 0 and equals returns false.
This should return events either that have different Command than previous one, or that are same as before but occurred after a time longer than hold-off.
Now thinking about it, it should be possible to do it very simple by ZIPping current and previous value:
aand that should be it. As usual, more than one way to do the same thing.
Upvotes: 0