Reputation: 2555
The context is using Couchbase to implement a REST CRUD service on a 2-level document store. The data model is an index document pointing to zero or more item documents. The index document is retrieved as an Observable using an asynchronous get. This is followed by a .flatMap() that retrieves zero or more IDs for each item document. The async get returns an Observable, so now the Observable I'm creating is Observable>. I want to chain a .merge() operator that will take "an Observable that emits Observables, and will merge their output into the output of a single Observable" to quote the ReactiveX documentation :) Then I will .subscribe() to that single Observable to retrieve item documents. The .merge() operator has a many signatures, but I can't figure out how to use it in a chain of operators as follows:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
EDIT:
You probably guessed I'm a reactive newbie. The answer from @akarnokd helped me realise what I was trying to do was dumb. The solution is to merge the emissions from the items Observable<Observable<JsonDocument>>
inside the document
closure and return the result of that. This emits the resulting JsonDocuments
from the flatMap
:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
Tested and works :)
Upvotes: 0
Views: 777
Reputation: 4472
You can call toList()
to collect all emitted items into one list. I've not tested it but what about something like this:
bucket.async()
.get(id)
.flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
.flatMap(bucket::get)
.toList()
.subscribe(results -> /* list of documents */);
Upvotes: 0
Reputation: 69997
Due to expressive limits of Java, we can't have a parameterless merge()
operator that can be applied on an Observble<Observable<T>>
. It would require extension methods such as in C#.
The next best thing is to do an identity flatMap
:
// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
Upvotes: 0