Reputation: 1074
I'm trying to wrap my head around the use cases for the RxJs operator groupBy
and I'm concerned that in certain instances it may lead to a memory leak.
I'm familiar with groupBy in the tradition sense (synchronous list processing for example). I'm going to write out a groupBy function to make reference to:
const groupBy = f => list =>
list.reduce((grouped, item) => {
const category = f(item);
if (!(category in grouped)) {
grouped[category] = [];
}
grouped[category].push(item);
return grouped;
}, {});
const oddsAndEvens = x => x % 2 === 0 ? 'EVEN' : 'ODD';
compose(
console.log,
groupBy(oddsAndEvens)
)([1,2,3,4,5,6,7,8])
// returns: { ODD: [ 1, 3, 5, 7 ], EVEN: [ 2, 4, 6, 8 ] }
Note that this is stateless in the broader scope. I'm assuming that RxJs does something similar to this where in the place of EVEN and ODD there would be returned observables, and that it keeps track of the groups statefully in something that behaves like a set. Correct me if I'm wrong, the main point is that I think RxJs would have to maintain a stateful list of all groupings.
My question is, what happens if the number of grouping values (just EVEN and ODD in this example) are not finite? For example, a stream that gives you a unique identifier to maintain coherence over the life of the stream. If you were to group by this identifier would RxJs's groupBy operator keep making more and more groups even tho old identifiers will never be revisited again?
Upvotes: 5
Views: 1013
Reputation: 14687
If your stream is infinite and your Key Selector can produce infinite groups, then - yes, you have a memory leak.
You can set a Duration Selector for every grouped observable. The Duration Selector is created for each group and signals on the expiration of the group.
rxjs 5+: groupBy 3rd parameter.
rxjs 4: use the groupedByUntil operator instead.
Here is an example of an infinite stream, where each of the grouped Observables is closed after 3 seconds.
Rx.Observable.interval(200)
.groupBy(
x => Math.floor(x / 10),
x => x,
x$ => Rx.Observable.timer(3000).finally(() => console.log(`closing group ${x$.key}`))
)
.mergeMap(x$ => x$.map(x => `group ${x$.key}: ${x}`))
.subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.8/Rx.js"></script>
Upvotes: 10
Reputation: 20033
My question is, what happens if the number of grouping values (just EVEN and ODD in this example) are not finite?
That can only happen in infinite streams (as there can't be more groups than values on the source stream). The answer is simple: you will keep creating new observables.
Each GroupedObservable
lives exactly as long as the source (groups are completed when the source completes), as you can see in the docs:
Technically there is no memory leak here since you're actively observing an infinite observable. Once the source observable completes, so will all groups:
source$
.takeUntil(stop$)
.groupBy(…)
But in a less technical sense: grouping an infinite observable over a unique property without ever unsubscribing from the source won't do your memory usage a big favor, no.
If you were to group by this identifier would RxJs's groupBy operator keep making more and more groups even tho old identifiers will never be revisited again?
The thing to point out here is that there is nothing rxjs could do about this. It cannot know whether a group is done or whether it will receive another value at some point later on.
Upvotes: 2