Reputation: 11559
I am trying to create my own RxJava operator called groupByUntilChanged()
. It will function like a groupBy()
but the emissions are assumed to be in an order based on the key. So the moment the key value changes, it completes that GroupedObservable
and moves on to the next GroupedObservable
for the next key.
Here is my work so far. I use the first letter of each String
as the key. This seems to work fine until I throw an "A" String
at the end.
public class Test {
public static void main(String[] args) {
Observable<String> items =
Observable.just("Alpha","Adam","Apple","Beta","Brick","Bridge","Bat","Gamma","Gorilla","Axe");
Func1<String,String> keyExtractor = s -> s.substring(0,1);
items.compose(orderedGroupBy(keyExtractor))
.flatMap(grp -> grp.toList())
.subscribe(System.out::println);
}
public static <T,K> Observable.Transformer<T,GroupedObservable<K,T>> orderedGroupBy(Func1<T,K> keySelector) {
return obs -> obs.groupBy(keySelector)
.map(grp ->
GroupedObservable.from(grp.getKey(),grp.takeWhile(t -> keySelector.call(t).equals(grp.getKey())))
);
}
}
I get this output:
[Alpha, Adam, Apple, Axe]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]
When I really want this:
[Alpha, Adam, Apple]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]
[Axe]
What can I do so that an ordered set of emissions will onComplete()
the GroupedObservable
when the key changes?
Upvotes: 1
Views: 148
Reputation: 1960
Such problems are best solved with custom operators - transformations which depend on state (here, already processed items and their keys) are not the best target for reactive approach, and often require Subjects
. With built-in operators, (verbose) solution for cold observables can be as follows:
public static <K, T> Observable.Transformer<T, GroupedObservable<K, T>> groupByUntilChanged(Func1<? super T, ? extends K> keyExtractor) {
return observable -> groupByUntilChanged(observable, keyExtractor);
}
static <K, T> Observable<GroupedObservable<K, T>> groupByUntilChanged(Observable<T> itemsStream,
Func1<? super T, ? extends K> keyExtractor) {
/*keys according to keyExtractor */
Observable<K> keysStream = itemsStream.distinctUntilChanged(keyExtractor).map(keyExtractor);
Func1<K, Func1<T, Boolean>> itemByKey = key -> item -> key.equals(keyExtractor.call(item));
/*predicate functions to match sub stream specified by key extractor*/
Observable<Func1<T, Boolean>> itemsByKeyFuncStream = keysStream.map(itemByKey);
/*stream chunks are processed sequentially, some kind of state bookkeeping is needed: let it be the number of
already processed items */
BehaviorSubject<Integer> skipCountStream = BehaviorSubject.create(0);
Observable<GroupedObservable<K, T>> groupByUntilChangedStream = itemsByKeyFuncStream.concatMap(takeF ->
/*skip already processed items, take while key extractor predicate is true*/
skipCountStream.first().map(count -> itemsStream.skip(count).takeWhile(takeF)))
.doOnNext(subItems ->
/*once group is ready, increase number of already processed items*/
subItems.count()
.flatMap(subItemsSize -> skipCountStream.first().map(allSize -> allSize + subItemsSize))
.subscribe(skipCountStream::onNext))
/*convert to GroupedObservable*/
.zipWith(keysStream, (obs, key) -> GroupedObservable.from(key, obs));
return groupByUntilChangedStream;
}
It was tested with
Observable<String> itemsStream =
Observable.just("Alpha", "Adam", "Apple", "Beta", "Brick", "Bridge", "Bat", "Gamma", "Gorilla", "Axe");
Func1<String, String> keyExtractor = s -> s.substring(0, 1);
Observable<GroupedObservable<String, String>> groupByUntilChangedStream = itemsStream.compose(groupByUntilChanged(keyExtractor));
/*groups starting with "A"*/
groupByUntilChangedStream
.filter(grouped -> grouped.getKey().equals("A"))
.flatMap(Observable::toList)
.defaultIfEmpty(Collections.emptyList())
.subscribe(System.out::println);
And result was
[Alpha, Adam, Apple]
[Axe]
Upvotes: 1
Reputation: 12097
Coordinating completion of the GroupedObservable
across the groupBy
operator is a pretty tricky thing (though synchronous processing in your case may enable other solutions). For this reason, groupBy
has an overload that allows you to specify a mapFactory
. If you use Guava CacheBuilder
as per javadoc on the groupBy
overload then you can specify a max size of 1 for the map and your desired behaviour results:
Func1<String,String> keySelectory = s -> s.substring(0,1);
Func1<String,String> elementSelectory = s -> s;
Func1<Action1<String>, Map<String, String>> mapFactory =
action ->
CacheBuilder.newBuilder()
.maximumSize(1)
.removalListener(notification ->
action.call(notification.getKey()))
.<String, String> build().asMap();
items.groupBy(keySelector, elementSelector, mapFactory)
.flatMap(grp -> grp.toList())
.subscribe(System.out::println);
Upvotes: 1