Reputation: 939
i am using rx distinct operator to filter external data stream based on a certain key within a long running process.
will this cause leak in the memory? Assuming a lot of different keys will be received. How does rx distinct operator keep track of previously received keys?
Should I use groupbyuntil with a duration selector instead?
Upvotes: 3
Views: 1434
Reputation: 329
This may be a controversial tactic, but if you were worried about distinct keys accumulating, and if there was a point in time where this could safely be reset, you could introduce a reset policy using Observable.Switch. For example, we have a scenario where the "state of the world" is reset on a daily basis, so we could reset the distinct observable daily.
Observable.Create<MyPoco>(
observer =>
{
var distinctPocos = new BehaviorSubject<IObservable<MyPoco>>(pocos.Distinct(x => x.Id));
var timerSubscription =
Observable.Timer(
new DateTimeOffset(DateTime.UtcNow.Date.AddDays(1)),
TimeSpan.FromDays(1),
schedulerService.Default).Subscribe(
t =>
{
Log.Info("Daily reset - resetting distinct subscription.");
distinctPocos.OnNext(pocos.Distinct(x => x.Id));
});
var pocoSubscription = distinctPocos.Switch().Subscribe(observer);
return new CompositeDisposable(timerSubscription, pocoSubscription);
});
However, I do tend to agree with James World's comment above regarding testing with a memory profiler to check that memory is indeed an issue before introducing potentially unnecessary complexity. If you're accumulating 32-bit ints as the key, you'd have many millions of unique items before running into memory issues on most platforms. E.g. 262144 32-bit int keys will take up one megabyte. It may be that you reset the process long before this time, depending on your scenario.
Upvotes: 1
Reputation: 5074
Observable.Distinct
uses a HashSet
internally. Memory usage will be roughly proportional to the number of distinct Keys encountered. (AFAIK about 30*n bytes)
GroupByUntil
does something really different than Distinct
.
GroupByUntil
(well) groups, whereas Distinct
filters the elements of a stream.
Not sure about the intended use, but if you just want to filter consecutive identical elements you need Observable.DistinctUntilChanged
which has a memory footprint independent of the number of keys.
Upvotes: 5