FuzzY
FuzzY

Reputation: 670

groupBy, filter and memory leak in Rx

According to the documentation of groupBy:

Note: A GroupedObservable will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those GroupedObservables that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like take(int)(0) to them.

There's a RxJava tutorial which says:

Internally, every Rx operator does 3 things

  1. It subscribes to the source and observes the values.
  2. It transforms the observed sequence according to the operator's purpose.
  3. It pushes the modified sequence to its own subscribers, by calling onNext, onError and onCompleted.

Let's take a look at the following code block which extracts only even numbers from range(0, 10):

Observable.range(0, 10)
        .groupBy(i -> i % 2)
        .filter(g -> g.getKey() % 2 == 0)
        .flatMap(g -> g)
        .subscribe(System.out::println, Throwable::printStackTrace);

My questions are:

  1. Does it mean filter operator already implies a subscription to every group resulted from groupBy or just the Observable<GroupedObservable> one?

  2. Will there be a memory leak in this case? If so,

  3. How to properly discard those groups? Replace filter with a custom one, which does a take(0) followed by a return Observable.empty()? You may ask why I don't just return take(0) directly: it's because filter doesn't neccessarily follow right after groupBy, but can be anywhere in the chain and involve more complex conditions.

Upvotes: 5

Views: 1624

Answers (2)

Dave Moten
Dave Moten

Reputation: 12087

Your suspicions are correct in that to properly handle the grouped observable each of the inner observables (g) must be subscribed to. As filter is subscribing to the outer observable only it's a bad idea. Just do what you need in the flatMap using ignoreElements to filter out undesired groups.

Observable.range(0, 10)
    .groupBy(i -> i % 2)
    .flatMap(g -> {
       if (g.getKey() % 2 == 0) 
         return g;
       else 
         return g.ignoreElements();
    })
    .subscribe(System.out::println, Throwable::printStackTrace);

Upvotes: 4

akarnokd
akarnokd

Reputation: 69997

Apart from the memory leak, the current implementation may end up hanging completely due to internal request coordination problems.

Note that using take(0), the group may be recreated all the time. I'd instead use ignoreElements which drops values, no items reach flatMap and the group itself won't be recreated all the time.

Upvotes: 5

Related Questions