sonicblis
sonicblis

Reputation: 3194

RX GroupByUntil with sliding Until

I'm using GroupByUntil to group messages from MSMQ that have specific property values which is working wonderfully. I'm using this code.

observable.GroupByUntil(
    message => message.Source,
    message => message.Body,
    message => Observable.Timer(new TimeSpan(0,0,5)) //I thought this was sliding expiration
).Subscribe(HandleGroup);

I mistakenly thought that each time a new message arrived for a given group, that group's durationSelector would restart, essentially waiting for the duration to pass with no new messages before ending the group. I realize that is not the case, and that the durationSelector is going to continue to count down no matter what. What is the best way to achieve a sliding durationSelector for each group as it's being grouped?

Upvotes: 2

Views: 630

Answers (1)

Shlomo
Shlomo

Reputation: 14350

Switch is your friend.

observable.GroupByUntil(
    message => message.Source,
    message => message.Body,
    group => group
        .Select(message => Observable.Timer(new TimeSpan(0, 0, 5)))
        .Switch() 
).Subscribe(HandleGroup);

Explanation:

  • For each message, create a timer that fires once after 5 seconds
  • If another message comes along within the same group, drop the old timer, and switch to the new one.

Upvotes: 2

Related Questions