Reputation: 231
I would like to know how to make several parallel calls to REST or Web services, then join the responses and send it in the response of a call @RestController.
Something similar to the following code built with ComparableFuture, but with Reactor(Flux, Mono).
CompletableFuture<Company> companyCompletableFuture = CompletableFuture.supplyAsync(() -> {
return Company.find.where().eq("id", id).findUnique();
});
CompletableFuture<List<Domain>> domainsCompletableFuture = CompletableFuture.supplyAsync(() -> {
return Domain.find.where().eq("company_id", id).findList();
});
// wait for all the data
CompletableFuture allDoneFuture = CompletableFuture.allOf(companyCompletableFuture, domainsCompletableFuture);
allDoneFuture.get(); // wait for all done
company = companyCompletableFuture.get();
domain = domainsCompletableFuture.get()
Upvotes: 2
Views: 4156
Reputation: 13471
You can use subscribeOn to run in new threads and zip operator
/**
* SubscribeOn just like in Rx make the pipeline run asynchronously, from the beginning to the end.
* <p>
* In this example we get the three Flux pipelines and we run async all of them.
* Once they are finish we zip the results in the order we establish in the operator.
*/
@Test
public void subscribeOn() throws InterruptedException {
Scheduler scheduler = Schedulers.newElastic("thread");
Scheduler scheduler1 = Schedulers.newElastic("thread");
Scheduler scheduler2 = Schedulers.newElastic("thread");
Flux<String> flux1 = Flux.just("hello ")
.doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
.subscribeOn(scheduler);
Flux<String> flux2 = Flux.just("reactive")
.doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
.subscribeOn(scheduler1);
Flux<String> flux3 = Flux.just(" world")
.doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
.subscribeOn(scheduler2);
Flux.zip(flux1, flux2, flux3)
.map(tuple3 -> tuple3.getT1().concat(tuple3.getT2()).concat(tuple3.getT3()))
.map(String::toUpperCase)
.subscribe(value -> System.out.println("zip result:" + value));
Thread.sleep(1000);
}
You can see more examples of Reactor here https://github.com/politrons/reactive
Upvotes: 0
Reputation: 1550
You may create two Mono from callable and then zip them. And if you want to execute callable in parallel you need also explicit add subscribeOn(Schedulers.parallel())
to each Mono
:
Mono<Integer> mono1 = Mono.fromCallable(() -> {
System.out.println(Thread.currentThread().getName());
return 123;
}).subscribeOn(Schedulers.parallel());
Mono<Integer> mono2 = Mono.fromCallable(() -> {
System.out.println(Thread.currentThread().getName());
return 321;
}).subscribeOn(Schedulers.parallel());
Tuple2<Integer, Integer> result = mono1.zipWith(mono2).block();
System.out.println(result.getT1());
System.out.println(result.getT2());
Result will be like this:
parallel-1
parallel-2
123
321
Upvotes: 5