pink spikyhairman
pink spikyhairman

Reputation: 2555

How to merge the output of an Observable that emits Observables without breaking the operator chain?

The context is using Couchbase to implement a REST CRUD service on a 2-level document store. The data model is an index document pointing to zero or more item documents. The index document is retrieved as an Observable using an asynchronous get. This is followed by a .flatMap() that retrieves zero or more IDs for each item document. The async get returns an Observable, so now the Observable I'm creating is Observable>. I want to chain a .merge() operator that will take "an Observable that emits Observables, and will merge their output into the output of a single Observable" to quote the ReactiveX documentation :) Then I will .subscribe() to that single Observable to retrieve item documents. The .merge() operator has a many signatures, but I can't figure out how to use it in a chain of operators as follows:

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return items;
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.merge( ???????? )
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

EDIT:

You probably guessed I'm a reactive newbie. The answer from @akarnokd helped me realise what I was trying to do was dumb. The solution is to merge the emissions from the items Observable<Observable<JsonDocument>> inside the document closure and return the result of that. This emits the resulting JsonDocuments from the flatMap:

bucket
.async()
.get(id)
.flatMap(
    document -> {

        JsonArray itemArray = (JsonArray) document.content().get("item");
        //  create Observable that gets and emits zero or more 
        // Observable<Observable<JsonDocument>> ie. bucket.async().gets
        Observable<Observable<JsonDocument>> items =
            Observable.create(observer -> {
                try {
                    if (!observer.isUnsubscribed()) {
                itemArray.forEach(
                    (jsonObject) -> {
                        String itemId = ((JsonObject)jsonObject).get("itemid").toString();
                        observer.onNext( 
                            bucket.async().get(itemId)
                        );
                    }
                    }
                );
                        observer.onCompleted();
                    }
                } catch (Exception e) {
                    observer.onError(e);
                }
            }
        );          
        return Observable.merge(items);
    },
    throwable -> {
        //  error handling omitted...
        return Observable.empty();
    },
    () -> {
        //  on complete processing omitted...
        return null;
    }
)
.subscribe( 
    nextItem -> {
        //  do something with each item document...
    },
    throwable -> {
        //  error handling omitted...
    },
    () -> {
         //  do something else...
    }
);

Tested and works :)

Upvotes: 0

Views: 777

Answers (2)

Julian Go
Julian Go

Reputation: 4472

You can call toList() to collect all emitted items into one list. I've not tested it but what about something like this:

bucket.async()
  .get(id)
  .flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
  .flatMap(bucket::get)
  .toList()
  .subscribe(results -> /* list of documents */);

Upvotes: 0

akarnokd
akarnokd

Reputation: 69997

Due to expressive limits of Java, we can't have a parameterless merge() operator that can be applied on an Observble<Observable<T>>. It would require extension methods such as in C#.

The next best thing is to do an identity flatMap:

// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)

Upvotes: 0

Related Questions