tmn
tmn

Reputation: 11559

RxJava- Consolidating multiple, infinite Observable<List<T>>?

Here is a fun little RxJava puzzle I am dealing with. Let's say I have an Observable<List<Parent>> infiniteParentListStream that is infinite, and each Parent has an Observable<List<Child>> infiniteChildListStream property which also is infinite.

I want to take all Parent instances in the emitted List<Parent>, and consolidate each of their emitted List<Child> items into a single, whole List<Child> reflecting all children of all parents.

The fact the Observable<List<Child>> infiniteChildListStream property in Parent is infinite is making the toList() task a little challenging.

public final class NestedInfiniteTest {

    private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
    private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);

    public static void main(String[] args) {


        Observable<List<Parent>> infiniteParentListStream = parentSubject
                .map(i -> Arrays.asList(new Parent(), new Parent(), new Parent()))
                .cache(1);

        Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
                Observable.from(parentList)
                        .flatMap(p -> p.getInfiniteChildListStream().flatMap(Observable::from)).toList()
        ).cache(1);

        allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
    }

    private static final class Parent {
        private final Observable<List<Child>> infiniteChildListStream = childSubject
                .map(i -> Arrays.asList(new Child(), new Child(), new Child())).cache(1);

        public Observable<List<Child>> getInfiniteChildListStream() {
            return infiniteChildListStream;
        }
    }
    private static final class Child {

    }
}

One workaround solution I found, of course, is to turn the infiniteChildListStream finite by calling first(). But this is less than desirable since it no longer updates.

Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
        Observable.from(parentList)
                .flatMap(p -> p.getInfiniteChildListStream().first().flatMap(Observable::from)).toList()
).cache(1);

I feel like there is a way to manually call Observable.create() or use flatMap() tricks to resolve this. Is there a better way to do this and to keep things reactive with the infinite sources? In my real application outside this SSCCE, these observables are infinite because the data sources driving Parent and Child could change and emit new values...

I guess the root of my question is how do I take multiple infinite Observable<List<T>> and consolidate them to a single Observable<List<T>> ?

Upvotes: 2

Views: 751

Answers (1)

tmn
tmn

Reputation: 11559

I think I figured it out by using Observable.combineLatest(). To enhance testing, I also modified the source observables to create varying List sizes based on the subject's pushed integer value. This looks like it works beautifully.

public final class NestedInfiniteTest {

    private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
    private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);

    public static void main(String[] args) {

        Observable<List<Parent>> infiniteParentListStream = parentSubject
                .map(i -> IntStream.range(0,i).mapToObj(val -> new Parent()).collect(Collectors.toList()))
                .cache(1);

        Observable<List<Child>> allCurrentChildren = infiniteParentListStream.flatMap(parentList ->
                Observable.<Observable<List<Child>>>create(s -> {
                    parentList.stream().map(Parent::getInfiniteChildListStream).forEach(s::onNext);
                    s.onCompleted();
                })
                .toList() //List<<Observable<List<Child>>>>
                .flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, new FuncN<List<Child>>() {
                    @Override
                    public List<Child> call(Object... args) {
                        ArrayList<Child> list = new ArrayList<>();
                        for (Object obj : args) {
                            list.addAll((List<Child>) obj);
                        }
                        return list;
                    }
                }))
        );


        allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
        childSubject.onNext(10);
        parentSubject.onNext(5);
        childSubject.onNext(2);
    }

    private static final class Parent {
        private final Observable<List<Child>> infiniteChildListStream = childSubject
                .map(i -> IntStream.range(0, i).mapToObj(val -> new Child()).collect(Collectors.toList())).cache(1);

        public Observable<List<Child>> getInfiniteChildListStream() {
            return infiniteChildListStream;
        }
    }
    private static final class Child {

    }
}

OUTPUT:

WHOLE CHILD LIST SIZE: 1   //parentSubject = 1, childSubject = 1
WHOLE CHILD LIST SIZE: 10  //parentSubject = 1, childSubject = 10
WHOLE CHILD LIST SIZE: 50  //parentSubject = 5, childSubject = 10
WHOLE CHILD LIST SIZE: 2   //parentSubject = 5, childSubject = 2, adjusting
WHOLE CHILD LIST SIZE: 42  //adjusting
WHOLE CHILD LIST SIZE: 34  //adjusting
WHOLE CHILD LIST SIZE: 26  //adjusting
WHOLE CHILD LIST SIZE: 18  //adjusting
WHOLE CHILD LIST SIZE: 10  //parentSubject = 5, childSubject = 2, done!

UPDATE: Created a Transformer to perform this task

public static class CombinedListTransformer<T,R> implements Observable.Transformer<List<T>,List<R>> {

    private final Func1<T,Observable<List<R>>> listMapper;

    public CombinedListTransformer(Func1<T,Observable<List<R>>> listMapper) {
        this.listMapper = listMapper;
    }
    @Override
    public Observable<List<R>> call(Observable<List<T>> sourceList) {
        return sourceList.flatMap(sl ->
            Observable.from(sl).map(t -> listMapper.call(t)).toList() //List<Observable<List<R>>
            .flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, args -> {
                ArrayList<R> list = new ArrayList<>();
                for (Object obj : args) {
                    list.addAll((List<R>) obj);
                }
                return list;
            }))
        );
    }
}

Upvotes: 2

Related Questions