NullReference
NullReference

Reputation: 1026

Observable Buffer

I am developing a solution where changes occur at high rate for example for a user. What I need is to record each change, delay notification and then return distinct changes e.g user ids for users that had been changed. I have come up with the following code using rx:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .Buffer(EVENT_BUFFER_DELAY)
    .Where(buffer => buffer.Count > 0)                
    .Subscribe(OnEventBufferProcess);

This seem to work correctly and I get all the values that were added by

userEventSubject.OnNext(userId);

My question is: Can I distinct the changes e.g when having multiple OnNext with same user id value I don't want the resulting buffer to contain duplicates. Of course I can distinct values in the Subscribe handler but I wondered if this can be done on rx level? I tried the distinct method but would still get all the values.

Since all I want is to track changes made during the delay, return to Subscribe handler and start over, do I need to clear the userEventSubject?

Upvotes: 0

Views: 2319

Answers (2)

ds-b
ds-b

Reputation: 361

You could use DistinctUntilChanged extension method:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .DistinctUntilChanged()
    .Buffer(EVENT_BUFFER_DELAY)    
    .Where(buffer => buffer.Count > 0)                
    .Subscribe(OnEventBufferProcess);

Upvotes: 0

TheInnerLight
TheInnerLight

Reputation: 12184

If you use Observable.Distinct method, you'll be checking for distinct IList<int>s which indeed isn't the behaviour that you want.

It seems that you need to apply the distinct to the elements within the buffer.

You can apply this to every buffer using the Observable.Select method:

private Subject<int> userEventSubject= new Subject<int>();
userEventSubject
    .Buffer(EVENT_BUFFER_DELAY)
    .Where(buffer => buffer.Count > 0)
    .Select(buffer => buffer.Distinct()) // distinct elements in each buffer
    .Subscribe(OnEventBufferProcess);

Be aware that whenever you use Distinct() methods, you need to be wary about how you are checking equality, if you are working with reference types - you will likely need to implement some kind of equality comparer. In this case, it's not a problem because you're working with ints.

Upvotes: 4

Related Questions