Tobias von Falkenhayn
Tobias von Falkenhayn

Reputation: 1699

Rx .NET take first and skip after time interval or condition

I'm learning rx in .NET and i've got the following requirements:

So given the example data:

using System;
using System.Reactive.Linq;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            var results = new[]
                {
                 "right", //1
                 "left", //2
                 "right", //3
                 "right", //4
                 "left", //5
                 "right", //6
                 "right", //7
                 "right", //8
                 "left" //9
                };

            var observable =  Observable.Generate(0, 
                x => x < results.Length, 
                x => x + 1,
                x => results[x], 
                x => 
                TimeSpan.FromSeconds(1)).Timestamp();

            observable.Subscribe(...);

            Console.ReadKey();
        }        
    }
}

The result should be:

right //1
left //2
right //3
left //5
right //6
right //8
left //9

String 4 has been skipped because its only 1 second to the last "right", so has string 7. However, string 8 has not been skipped cause there are 2 seconds to string 6.

Possible solutions:

I tried to use a windowing function to skip entries, but this will skipp all strings even if they aren't the same value:

observable.Publish(ps =>
           ps.Window(() => ps.Delay(TimeSpan.FromSeconds(2)))
             .SelectMany(x => x.Take(1))).Subscribe(f => Console.WriteLine(f.Value));

I also tried to add timestamps to each value and check them in a DistinctUntilChanged() EqualityComparer, but this seems also not to work as expected.

Upvotes: 3

Views: 1558

Answers (3)

Shlomo
Shlomo

Reputation: 14350

This is trickier than I thought because of the triple case (right, right, right all one second apart). Using a straight .Zip won't work here.

This is similar to Sentinel's answer, and correctly handles the triple case:

source
    .Timestamp()        
    .Scan((state, item) => state == null || item.Timestamp - state.Timestamp > TimeSpan.FromSeconds(2) || state.Value != item.Value
        ? item
        : state
    )
    .DistinctUntilChanged()
    .Select(t => t.Value);

Explanation:

  • .Timestamp() wraps each message with the timestamp it arrives in
  • .Scan(1 arg) if a duplicate-within-2-seconds comes, then it repeats the previous message, otherwise emit new message
  • .DistinctUntilChanged() strips out duplicate messages, which will occur because .Scan is emitting the old message twice
  • .Select removes the timestamp.

Upvotes: 2

Sentinel
Sentinel

Reputation: 3697

I have not tested this code, but you get the general idea.

        source
            .Select(x => (str: x, timestamp: DateTime.Now))
            .Scan((last: (str: "", timestamp: DateTime.MinValue), next: (str: "", timestamp: DateTime.MinValue)),
                (state, val) =>
                    (last: (str: state.next.str, timestamp: state.next.timestamp), next: (str: val.str, timestamp: val.timestamp))
                )
            .Where(x => (x.next.str != x.last.str) || (x.next.timestamp - x.last.timestamp) > TimeSpan.FromSeconds(2))
            .Select(x=>x.next.str);

Upvotes: 2

quetzalcoatl
quetzalcoatl

Reputation: 33506

Hm.. sounds something like observable.DistinctUntilChanged to detect distinct events, but merged via CombineLatest with observable.Debounce to also get the repeated-after-some-time....

That would cover the basics, but I'm not sure what would happend if a different-than-previous item came after time-longer-than-debounce.. both source DistinctUntilChanged and Debounce operators would pass the item "at the same time" and I'm not sure what would CombineLatest do at this point. There's a change you will get such event twice (the same event in a very short period of time), so you'd need to deduplicate it out once more.

If you are willing to add some kind of timestamps, then there's also pretty explicit way to do it, although I'm not sure if it's really easier..

  • take source events - stream of {command}
  • GroupBy the type of the item - stream of substream of {command}, each substream contains ONLY one type of command
  • apply TimeInterval to each of those sub-streams, you get stream of substream of {command, time-since-last-seen}, each substream contains ONLY one type of command
  • CombineLatest them all, you get stream of {command, time-since-THIS-TYPE-was-last-seen}
  • transform it into {command, null-or-ObjectThatIsAlwaysDifferent} depending on the 'time', if time is less than the hold-off period then NULL, if time is larger than hold-off, then use some magic value that IsAlwaysDifferent
  • DistinctUntilChanged that

You should be able to implement the magic ObjectThatIsAlwaysDifferent by simply making a class whole gethashcode returns 0 and equals returns false.

This should return events either that have different Command than previous one, or that are same as before but occurred after a time longer than hold-off.

Now thinking about it, it should be possible to do it very simple by ZIPping current and previous value:

  • take source stream of {command}
  • add timestamps - {command, timestamp}
  • delay it by 1, remember both source and delayed
  • zip them together, now you get stream of [{command, timestamp}, {prevcommand, prevtimestamp}]
  • filter it with your code:
    • pass when command != prevcommand
    • pass when command == prevcommand && timestamp-prevtimestamp > holdoff

aand that should be it. As usual, more than one way to do the same thing.

Upvotes: 0

Related Questions