Reputation: 11559
Here is a fun little RxJava puzzle I am dealing with. Let's say I have an Observable<List<Parent>> infiniteParentListStream
that is infinite, and each Parent
has an Observable<List<Child>> infiniteChildListStream
property which also is infinite.
I want to take all Parent
instances in the emitted List<Parent>
, and consolidate each of their emitted List<Child>
items into a single, whole List<Child>
reflecting all children of all parents.
The fact the Observable<List<Child>> infiniteChildListStream
property in Parent
is infinite is making the toList()
task a little challenging.
public final class NestedInfiniteTest {
private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);
public static void main(String[] args) {
Observable<List<Parent>> infiniteParentListStream = parentSubject
.map(i -> Arrays.asList(new Parent(), new Parent(), new Parent()))
.cache(1);
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
Observable.from(parentList)
.flatMap(p -> p.getInfiniteChildListStream().flatMap(Observable::from)).toList()
).cache(1);
allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
}
private static final class Parent {
private final Observable<List<Child>> infiniteChildListStream = childSubject
.map(i -> Arrays.asList(new Child(), new Child(), new Child())).cache(1);
public Observable<List<Child>> getInfiniteChildListStream() {
return infiniteChildListStream;
}
}
private static final class Child {
}
}
One workaround solution I found, of course, is to turn the infiniteChildListStream
finite by calling first()
. But this is less than desirable since it no longer updates.
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
Observable.from(parentList)
.flatMap(p -> p.getInfiniteChildListStream().first().flatMap(Observable::from)).toList()
).cache(1);
I feel like there is a way to manually call Observable.create()
or use flatMap()
tricks to resolve this. Is there a better way to do this and to keep things reactive with the infinite sources? In my real application outside this SSCCE, these observables are infinite because the data sources driving Parent
and Child
could change and emit new values...
I guess the root of my question is how do I take multiple infinite Observable<List<T>>
and consolidate them to a single Observable<List<T>>
?
Upvotes: 2
Views: 751
Reputation: 11559
I think I figured it out by using Observable.combineLatest()
. To enhance testing, I also modified the source observables to create varying List
sizes based on the subject's pushed integer value. This looks like it works beautifully.
public final class NestedInfiniteTest {
private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);
public static void main(String[] args) {
Observable<List<Parent>> infiniteParentListStream = parentSubject
.map(i -> IntStream.range(0,i).mapToObj(val -> new Parent()).collect(Collectors.toList()))
.cache(1);
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.flatMap(parentList ->
Observable.<Observable<List<Child>>>create(s -> {
parentList.stream().map(Parent::getInfiniteChildListStream).forEach(s::onNext);
s.onCompleted();
})
.toList() //List<<Observable<List<Child>>>>
.flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, new FuncN<List<Child>>() {
@Override
public List<Child> call(Object... args) {
ArrayList<Child> list = new ArrayList<>();
for (Object obj : args) {
list.addAll((List<Child>) obj);
}
return list;
}
}))
);
allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
childSubject.onNext(10);
parentSubject.onNext(5);
childSubject.onNext(2);
}
private static final class Parent {
private final Observable<List<Child>> infiniteChildListStream = childSubject
.map(i -> IntStream.range(0, i).mapToObj(val -> new Child()).collect(Collectors.toList())).cache(1);
public Observable<List<Child>> getInfiniteChildListStream() {
return infiniteChildListStream;
}
}
private static final class Child {
}
}
OUTPUT:
WHOLE CHILD LIST SIZE: 1 //parentSubject = 1, childSubject = 1
WHOLE CHILD LIST SIZE: 10 //parentSubject = 1, childSubject = 10
WHOLE CHILD LIST SIZE: 50 //parentSubject = 5, childSubject = 10
WHOLE CHILD LIST SIZE: 2 //parentSubject = 5, childSubject = 2, adjusting
WHOLE CHILD LIST SIZE: 42 //adjusting
WHOLE CHILD LIST SIZE: 34 //adjusting
WHOLE CHILD LIST SIZE: 26 //adjusting
WHOLE CHILD LIST SIZE: 18 //adjusting
WHOLE CHILD LIST SIZE: 10 //parentSubject = 5, childSubject = 2, done!
UPDATE: Created a Transformer to perform this task
public static class CombinedListTransformer<T,R> implements Observable.Transformer<List<T>,List<R>> {
private final Func1<T,Observable<List<R>>> listMapper;
public CombinedListTransformer(Func1<T,Observable<List<R>>> listMapper) {
this.listMapper = listMapper;
}
@Override
public Observable<List<R>> call(Observable<List<T>> sourceList) {
return sourceList.flatMap(sl ->
Observable.from(sl).map(t -> listMapper.call(t)).toList() //List<Observable<List<R>>
.flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, args -> {
ArrayList<R> list = new ArrayList<>();
for (Object obj : args) {
list.addAll((List<R>) obj);
}
return list;
}))
);
}
}
Upvotes: 2