Reputation: 1813
I'm learning Rx-Java2 with Vert.x and I would like to chain a success configuration retrieving with some parallel tasks.
I've created a method that search for the configuration and returns a Single subscribe to it and it worked fine. But I'm in doubt where and how do call the subsequent tasks:
public void start(Future<Void> startFuture) throws Exception {
Single<JsonObject> configSingle = prepareConfigurationAsync();
configSingle.subscribe(onSuccess -> {
System.out.println(onSuccess);
--> Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...);
--> Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...);
}, onError -> {
startFuture.fail(onError);
}));
The way I did seems to be working, but without parallelism. how could I achieve it ?
How and where should I dispose those subscriptions ?
Upvotes: 0
Views: 923
Reputation: 69997
Continuing with some other source is usually done via flatMap
. Doing things in parallel is often done with zip
or merge
. In your case, I don't think you need the value of the inner Single
as part of the output so you can try this:
Completable config = prepareConfigurationAsync()
.flatMapCompletable(success ->
System.out.println(success);
return Completable.mergeArray (
prepareLongAsyncTask1(success)
.doOnSuccess(innerSuccess -> /* ... */)
.toCompletable(),
prepareLongAsyncTask2(success)
.doOnComplete(() -> /* ... */)
)
);
config
.subscribe( () -> /* completed */, error -> /* error'd */);
Upvotes: 3