tmn
tmn

Reputation: 11559

RxJava - Creating a GroupByUntilChanged Operator?

I am trying to create my own RxJava operator called groupByUntilChanged(). It will function like a groupBy() but the emissions are assumed to be in an order based on the key. So the moment the key value changes, it completes that GroupedObservable and moves on to the next GroupedObservable for the next key.

Here is my work so far. I use the first letter of each String as the key. This seems to work fine until I throw an "A" String at the end.

public class Test {
    public static void main(String[] args) {
        Observable<String> items =
                Observable.just("Alpha","Adam","Apple","Beta","Brick","Bridge","Bat","Gamma","Gorilla","Axe");

        Func1<String,String> keyExtractor = s -> s.substring(0,1);

        items.compose(orderedGroupBy(keyExtractor))
                .flatMap(grp -> grp.toList())
                .subscribe(System.out::println);

    }

    public static <T,K> Observable.Transformer<T,GroupedObservable<K,T>> orderedGroupBy(Func1<T,K> keySelector) {
        return obs -> obs.groupBy(keySelector)
                .map(grp ->
                      GroupedObservable.from(grp.getKey(),grp.takeWhile(t -> keySelector.call(t).equals(grp.getKey())))
                );
    }
}

I get this output:

[Alpha, Adam, Apple, Axe]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]

When I really want this:

[Alpha, Adam, Apple]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]
[Axe]

What can I do so that an ordered set of emissions will onComplete() the GroupedObservable when the key changes?

Upvotes: 1

Views: 148

Answers (2)

m.ostroverkhov
m.ostroverkhov

Reputation: 1960

Such problems are best solved with custom operators - transformations which depend on state (here, already processed items and their keys) are not the best target for reactive approach, and often require Subjects. With built-in operators, (verbose) solution for cold observables can be as follows:

public static <K, T> Observable.Transformer<T, GroupedObservable<K, T>> groupByUntilChanged(Func1<? super T, ? extends K> keyExtractor) {
    return observable -> groupByUntilChanged(observable, keyExtractor);
}

static <K, T> Observable<GroupedObservable<K, T>> groupByUntilChanged(Observable<T> itemsStream,
                                                                      Func1<? super T, ? extends K> keyExtractor) {

    /*keys according to keyExtractor */
    Observable<K> keysStream = itemsStream.distinctUntilChanged(keyExtractor).map(keyExtractor);
    Func1<K, Func1<T, Boolean>> itemByKey = key -> item -> key.equals(keyExtractor.call(item));

    /*predicate functions to match sub stream specified by key extractor*/
    Observable<Func1<T, Boolean>> itemsByKeyFuncStream = keysStream.map(itemByKey);

    /*stream chunks are processed sequentially, some kind of state bookkeeping is needed: let it be the number of
      already processed items */
    BehaviorSubject<Integer> skipCountStream = BehaviorSubject.create(0);

    Observable<GroupedObservable<K, T>> groupByUntilChangedStream = itemsByKeyFuncStream.concatMap(takeF ->

            /*skip already processed items, take while key extractor predicate is true*/
            skipCountStream.first().map(count -> itemsStream.skip(count).takeWhile(takeF)))

            .doOnNext(subItems ->
                    /*once group is ready, increase number of already processed items*/
                    subItems.count()
                            .flatMap(subItemsSize -> skipCountStream.first().map(allSize -> allSize + subItemsSize))
                            .subscribe(skipCountStream::onNext))
             /*convert to GroupedObservable*/
            .zipWith(keysStream, (obs, key) -> GroupedObservable.from(key, obs));

    return groupByUntilChangedStream;
}

It was tested with

Observable<String> itemsStream =
            Observable.just("Alpha", "Adam", "Apple", "Beta", "Brick", "Bridge", "Bat", "Gamma", "Gorilla", "Axe");
    Func1<String, String> keyExtractor = s -> s.substring(0, 1);
    Observable<GroupedObservable<String, String>> groupByUntilChangedStream = itemsStream.compose(groupByUntilChanged(keyExtractor));

    /*groups starting with "A"*/
    groupByUntilChangedStream
            .filter(grouped -> grouped.getKey().equals("A"))
            .flatMap(Observable::toList)
            .defaultIfEmpty(Collections.emptyList())
            .subscribe(System.out::println);

And result was

[Alpha, Adam, Apple]
[Axe]

Upvotes: 1

Dave Moten
Dave Moten

Reputation: 12097

Coordinating completion of the GroupedObservable across the groupBy operator is a pretty tricky thing (though synchronous processing in your case may enable other solutions). For this reason, groupBy has an overload that allows you to specify a mapFactory. If you use Guava CacheBuilder as per javadoc on the groupBy overload then you can specify a max size of 1 for the map and your desired behaviour results:

Func1<String,String> keySelectory = s -> s.substring(0,1);
Func1<String,String> elementSelectory = s -> s;
Func1<Action1<String>, Map<String, String>> mapFactory =
   action -> 
     CacheBuilder.newBuilder()
       .maximumSize(1)
       .removalListener(notification ->
          action.call(notification.getKey()))
     .<String, String> build().asMap();
items.groupBy(keySelector, elementSelector, mapFactory)
            .flatMap(grp -> grp.toList())
            .subscribe(System.out::println);

Upvotes: 1

Related Questions