Theodosis
Theodosis

Reputation: 964

Quarkus Mutiny Uni/Multi wait for the request response to finish

I can't find a correct solution to this problem and I'm stuck. Let's say I have this method

    @GET
    @Path("/testingAsync")
    public Uni<List<String>> testingMutiny() {
        List<String> completeList = new ArrayList<>();
        completeList.add("hello");
        completeList.add("RestEasy");

        List<String> finalList = new ArrayList<>();

        completeList.forEach(e -> Uni.createFrom().item(e)
                .onItem().delayIt().by(Duration.ofMillis(10000))
                .map(value -> finalList.add(value.toUpperCase()))
                .subscribe().asCompletionStage());


        return Uni.createFrom().item(finalList);
    }

As you see the method is simple it just takes the values from 1 list and adds them to the second one but what's the problem? When you add the waiting .onItem().delayIt().by(Duration.ofMillis(10000)) the program will return an empty list and after a while, it will just update the list. I created this method to simulate a request that the response that has some delay in it.

Let's say you hit 2 URLs with 2 different Unis after that you try to combine them and return it as one Uni. The problem is if one of those 2 URLs delay for some reason we will return the list empty but I don't want that to happen I either want the list to be completed 100% or return an error if it takes a while.

What is the best approach to that? I understand that if you add await() you are blocking the main thread and you lose all the value of using the reactive library but still, I can't find a way for this to work

EDIT

I have found out that the external URL I try to call takes about 5 seconds to do the job so I want my code to stop when creating the Uni and continue after I have received an answer from the server. I have seen in their docs (here) That I can also call await.indefinitely but I receive The current thread cannot be blocked: vert.x-eventloop-thread-14. How do I wait for a response from the server?

EDIT 2

I understand that with strings it doesn't make sense my question so much so let's say I have the following one

    @GET
    @Path("/testingAsync")
    public Uni<List<Car>> testingMutiny() {

        //ALL THIS IS IN A FOR EACH FOR EVERY CAR

            //HIT ENDPOINT GET DOORS
            Uni<List<JsonObjectCar>> carDoorsUni = getDoors(variable1,
                    variable2, variable3);

            //HIT ENDPOINT GET WHEELS
            Uni<List<JsonObjectCar>> carWheelsUni = getWheels(variable1,
                    variable2, variable3);

            //HIT ENDPOINT GET WINDOWS
            Uni<List<JsonObjectCar>> carWindowsUni = getWindows(variable1,
                    variable2, variable3);

            Uni.combine()
                    .all()
                    .unis(carDoorsUni, carWheelsUni, carWindowsUni)
                    .combinedWith((carDoors, carWheels, carWindows) -> {
                        //Check if cardoors is present and set the doors into the car object
                        Optional.of(carDoors)
                                .ifPresent(val -> car.setDoors(val.getDoors()));
                        Optional.of(carWheels)
                                .ifPresent(val -> car.setWheels(val.getWheels()));
                        Optional.of(carWindows)
                                .ifPresent(val -> car.setWindows(val.getWindows()));

                        return car;
                    }).subscribe().with(e-> System.out.println("Okay it worked"));
            
          //END OF FOR EACH 
            
            
            //Return car (Should have been returned with new doors / wheels/ windows but instead its empty)
        return Uni.createFrom().item(car);

    }

As you see in the above code It should have hit some endpoints for doors / wheels / windows and set them into the variable car but what happens, in reality, is that the car is empty because one of those endpoints has been delayed so i return a car without those values inside it. I want to first update the car object and then actually return it

Upvotes: 2

Views: 10794

Answers (3)

SidMorad
SidMorad

Reputation: 974

With Multi and Stream we could come up with something like:

    @GET
    @Path("/testingAsync")
    public Multi<String> testingMutiny() {
        List<String> completeList = new ArrayList<>();
        completeList.add("hello");
        completeList.add("RestEasy");

        return Multi.createFrom().items(completeList.stream()).onItem()
                .transformToUni(value -> Uni.createFrom().item(value.toUpperCase()))
                .concatenate();
    }

Also is good to know about differences between Merging and Concatenating Streams

Upvotes: 1

Davide D&#39;Alto
Davide D&#39;Alto

Reputation: 8246

You could rewrite the method like this:

    @GET
    @Path("/testingAsync")
    public Uni<List<String>> testingMutiny() {
        List<Uni<String>> unis = new ArrayList<>();
        List.of("hello", "RestEasy").forEach( e -> {
            unis.add( Uni.createFrom().item( e )
                .onItem().delayIt().by( Duration.ofMillis( 10000 ) ) );
        } );

        return Uni.combine().all().unis( unis )
                .combinedWith( list -> (List<String>) list);
    }

Note that when you write reactive code, you want to avoid using .await().indefinetly. It shouldn't be needed anyway when using Quarkus, because it recognizes async API and interpret the results accordingly.

You also don't need to subscribe the Uni or Multi when using Quarkus, for the same reason.

Based on my previous example, you can rewrite your use case with endpoints as:

    @GET
    @Path("/testingAsync")
    public Uni<Car> testingMutiny() {
            Uni<List<JsonObjectCar>> carDoorsUni = getDoors(variable1, variable2, variable3);
            Uni<List<JsonObjectCar>> carWheelsUni = getWheels(variable1,variable2, variable3);
            Uni<List<JsonObjectCar>> carWindowsUni = getWindows(variable1,variable2, variable3);

            return Uni.combine()
                    .all()
                    .unis(carDoorsUni, carWheelsUni, carWindowsUni)
                    .combinedWith(list -> {
                        // Result of carDoorsUni
                        List<JsonObjectCar> carDoors = list.get(0);

                        // Result of carWheelsUni
                        List<JsonObjectCar> carWheels = list.get(1);

                        // Result of carWindowsUni
                        List<JsonObjectCar> carWindows = list.get(2);
                        
                        // Create a car instance with the previous results
                        Car car = createCar(...);

                        // You can also return a list of cars, but you need to change the return type of testingMutiny to Uni<List<Car>>
                        return car;
                    })
                    .invoke( () -> System.out.println("Okay it worked"));
    }

Upvotes: 5

jponge
jponge

Reputation: 653

You return a list, but the asynchronous processing on the Uni is delayed, so your list will be empty.

You should try returning a Uni from the pipeline that you create (and also see collect(), toUni() to put into lists) instead of doing a subscription, collect the results and re-wrap into a Uni.

Upvotes: 3

Related Questions