Reputation: 3194
I'm using GroupByUntil to group messages from MSMQ that have specific property values which is working wonderfully. I'm using this code.
observable.GroupByUntil(
message => message.Source,
message => message.Body,
message => Observable.Timer(new TimeSpan(0,0,5)) //I thought this was sliding expiration
).Subscribe(HandleGroup);
I mistakenly thought that each time a new message arrived for a given group, that group's durationSelector would restart, essentially waiting for the duration to pass with no new messages before ending the group. I realize that is not the case, and that the durationSelector is going to continue to count down no matter what. What is the best way to achieve a sliding durationSelector for each group as it's being grouped?
Upvotes: 2
Views: 630
Reputation: 14350
Switch
is your friend.
observable.GroupByUntil(
message => message.Source,
message => message.Body,
group => group
.Select(message => Observable.Timer(new TimeSpan(0, 0, 5)))
.Switch()
).Subscribe(HandleGroup);
Explanation:
Upvotes: 2