Cristiano
Cristiano

Reputation: 1813

How to chain multiple concurrent rx-java Single

I'm learning Rx-Java2 with Vert.x and I would like to chain a success configuration retrieving with some parallel tasks.

I've created a method that search for the configuration and returns a Single subscribe to it and it worked fine. But I'm in doubt where and how do call the subsequent tasks:

   public void start(Future<Void> startFuture) throws Exception {
      Single<JsonObject> configSingle = prepareConfigurationAsync();
      configSingle.subscribe(onSuccess -> {
        System.out.println(onSuccess);
        -->   Single<Boolean> task1 = prepareLongAsyncTask1(onSuccess).subscribe(...); 
        -->   Completable task2 = prepareLongAsyncTask2(onSuccess)..subscribe(...); 

    }, onError -> {
        startFuture.fail(onError);
    }));

The way I did seems to be working, but without parallelism. how could I achieve it ?

How and where should I dispose those subscriptions ?

Upvotes: 0

Views: 923

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Continuing with some other source is usually done via flatMap. Doing things in parallel is often done with zip or merge. In your case, I don't think you need the value of the inner Single as part of the output so you can try this:

 Completable config = prepareConfigurationAsync()
     .flatMapCompletable(success ->
         System.out.println(success);
         return Completable.mergeArray (
             prepareLongAsyncTask1(success)
                 .doOnSuccess(innerSuccess -> /* ... */)
                 .toCompletable(),
             prepareLongAsyncTask2(success)
                 .doOnComplete(() ->  /* ... */)
         )
      );

  config
  .subscribe( () -> /* completed */, error -> /* error'd */);

Upvotes: 3

Related Questions