Daniel Ruiz Pastor
Daniel Ruiz Pastor

Reputation: 231

Call in parallel rest o web services with Project Reactor?

I would like to know how to make several parallel calls to REST or Web services, then join the responses and send it in the response of a call @RestController.

Something similar to the following code built with ComparableFuture, but with Reactor(Flux, Mono).

CompletableFuture<Company> companyCompletableFuture = CompletableFuture.supplyAsync(() -> {
     return  Company.find.where().eq("id", id).findUnique();  
});

CompletableFuture<List<Domain>> domainsCompletableFuture = CompletableFuture.supplyAsync(() -> {
     return Domain.find.where().eq("company_id", id).findList();
});

// wait for all the data
CompletableFuture allDoneFuture = CompletableFuture.allOf(companyCompletableFuture, domainsCompletableFuture);

allDoneFuture.get(); // wait for all done
company = companyCompletableFuture.get();
domain = domainsCompletableFuture.get()

Upvotes: 2

Views: 4156

Answers (2)

paul
paul

Reputation: 13471

You can use subscribeOn to run in new threads and zip operator

 /**
     * SubscribeOn just like in Rx make the pipeline run asynchronously, from the beginning to the end.
     * <p>
     * In this example we get the three Flux pipelines and we run async all of them.
     * Once they are finish we zip the results in the order we establish in the operator.
     */
    @Test
    public void subscribeOn() throws InterruptedException {
        Scheduler scheduler = Schedulers.newElastic("thread");
        Scheduler scheduler1 = Schedulers.newElastic("thread");
        Scheduler scheduler2 = Schedulers.newElastic("thread");

        Flux<String> flux1 = Flux.just("hello ")
                .doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
                .subscribeOn(scheduler);
        Flux<String> flux2 = Flux.just("reactive")
                .doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
                .subscribeOn(scheduler1);
        Flux<String> flux3 = Flux.just(" world")
                .doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
                .subscribeOn(scheduler2);
        Flux.zip(flux1, flux2, flux3)
                .map(tuple3 -> tuple3.getT1().concat(tuple3.getT2()).concat(tuple3.getT3()))
                .map(String::toUpperCase)
                .subscribe(value -> System.out.println("zip result:" + value));
        Thread.sleep(1000);

    }

You can see more examples of Reactor here https://github.com/politrons/reactive

Upvotes: 0

kurt
kurt

Reputation: 1550

You may create two Mono from callable and then zip them. And if you want to execute callable in parallel you need also explicit add subscribeOn(Schedulers.parallel()) to each Mono:

Mono<Integer> mono1 = Mono.fromCallable(() -> {
    System.out.println(Thread.currentThread().getName());
    return 123;
}).subscribeOn(Schedulers.parallel());

Mono<Integer> mono2 = Mono.fromCallable(() -> {
    System.out.println(Thread.currentThread().getName());
    return 321;
}).subscribeOn(Schedulers.parallel());

Tuple2<Integer, Integer> result = mono1.zipWith(mono2).block();

System.out.println(result.getT1());
System.out.println(result.getT2());

Result will be like this:

parallel-1
parallel-2
123
321

Upvotes: 5

Related Questions