Misterhex
Misterhex

Reputation: 939

Advisable to use rx distinct in long running process?

i am using rx distinct operator to filter external data stream based on a certain key within a long running process.

will this cause leak in the memory? Assuming a lot of different keys will be received. How does rx distinct operator keep track of previously received keys?

Should I use groupbyuntil with a duration selector instead?

Upvotes: 3

Views: 1434

Answers (2)

Daniel Smith
Daniel Smith

Reputation: 329

This may be a controversial tactic, but if you were worried about distinct keys accumulating, and if there was a point in time where this could safely be reset, you could introduce a reset policy using Observable.Switch. For example, we have a scenario where the "state of the world" is reset on a daily basis, so we could reset the distinct observable daily.

Observable.Create<MyPoco>(
    observer =>
    {
        var distinctPocos = new BehaviorSubject<IObservable<MyPoco>>(pocos.Distinct(x => x.Id));

        var timerSubscription =
            Observable.Timer(
                new DateTimeOffset(DateTime.UtcNow.Date.AddDays(1)),
                TimeSpan.FromDays(1),
                schedulerService.Default).Subscribe(
                    t =>
                    {
                        Log.Info("Daily reset - resetting distinct subscription.");
                        distinctPocos.OnNext(pocos.Distinct(x => x.Id));
                    });

        var pocoSubscription = distinctPocos.Switch().Subscribe(observer);

        return new CompositeDisposable(timerSubscription, pocoSubscription);
    });

However, I do tend to agree with James World's comment above regarding testing with a memory profiler to check that memory is indeed an issue before introducing potentially unnecessary complexity. If you're accumulating 32-bit ints as the key, you'd have many millions of unique items before running into memory issues on most platforms. E.g. 262144 32-bit int keys will take up one megabyte. It may be that you reset the process long before this time, depending on your scenario.

Upvotes: 1

3dGrabber
3dGrabber

Reputation: 5074

Observable.Distinct uses a HashSet internally. Memory usage will be roughly proportional to the number of distinct Keys encountered. (AFAIK about 30*n bytes)

GroupByUntil does something really different than Distinct.
GroupByUntil (well) groups, whereas Distinct filters the elements of a stream.

Not sure about the intended use, but if you just want to filter consecutive identical elements you need Observable.DistinctUntilChanged which has a memory footprint independent of the number of keys.

Upvotes: 5

Related Questions