supertopi
supertopi

Reputation: 3488

Rx GroupByUntil with Throttling timer rebirth

Rx GroupByUntil documentation states

When a new element with the same key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.

I have a problem with this piece of code.

myObservable.GroupByUntil(
                    selectKey,
                    g => g.Throttle( selectTimer(g.Key),
                    throttlingScheduler))

My throttling timer selectTimer is not called when the group is "reborn" with a new element of same group key. Instead, element groupping continues with the same throttling timer.

Is there a way to refresh throttle timer when an element of same key-value occurs?

Upvotes: 3

Views: 991

Answers (1)

Brandon
Brandon

Reputation: 39182

I believe you are misunderstanding GroupByUntil. It only calls the duration selector when it newly constructs a group. When the observable returned by the duration selector produces a value, then it ends that group. The next time an item arrives with that group key, it will create a new group (and call the duration selector again).

Sounds like your actual issue is with Throttle. You want to know how to adjust the throttle timer on the fly. Try the overload of Throttle which takes a throttle duration selector, which it calls each time a new item arrives. Assuming selectTimer returns a TimeSpan:

g => g.Throttle(item => Observable.Timer(selectTimer(g.Key), throttlingScheduler))

Upvotes: 5

Related Questions