quarks
quarks

Reputation: 35276

Sync two asynchronous API call with RxJava

In what way can we sync two asynchronous calls using RxJava? In the example below, the method contentService.listContents which is a API call must first finish before the processSchema method to take place for each schema.

schemaService.listSchema()
    .toObservable()
    .flatMapIterable(schemas -> {
        schemas.forEach(schema -> {
            // async call
            contentService.listContents(schema.getName()).subscribe(contents -> {
                   doSomethingWithThe(contents); 
            });
        });
        // contentService.listContents` must complete first before 
        // processSchema should be called for each schema
        return schemas;
    }).subscribe(schema -> { processSchema(schema); }, 
                 error -> { Console.error(error.getMessage()); });

The problem with the code above the processSchema would not wait for the contentService.listContents since it is async not not synchronized with each other.

Upvotes: 1

Views: 813

Answers (2)

akarnokd
akarnokd

Reputation: 69997

You have to use flatMap to process the schemas and since it is a list, you have to unroll it and flatMap again:

schemaService.listSchema()
.toObservable()
.flatMap(schemas -> 
     Observable.fromIterable(schemas)
     .flatMap(schema -> 
         contentService.listContents(schema.getName())
         .doOnNext(contents -> doSomethingWith(contents))
     )
     // probably you don't care about the inner contents
     .ignoreElements()
     // andThen will switch to this only when the sequence above completes
     .andThen(Observable.just(schemas))
)
.subscribe(
    schema -> processSchema(schema), 
    error -> Console.error(error.getMessage())
);

Note that you haven't defined the return types of the service calls so you may have to use flatMapSingle and doOnSuccess for example.

Upvotes: 1

You are probably looking for flatMap.

From the docs

Continuations

Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.

Dependent

The most typical scenario is to given a value, invoke another service, await and continue with its result:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

Upvotes: 0

Related Questions