Jasper Siepkes
Jasper Siepkes

Reputation: 1405

Is it possible to wait for a event without blocking a thread with Project Reactor?

Is it possible with Project Reactor to wait in a mono for an event / condition without needing to use a blocking thread per mono? With a CompletableFuture I can pull such a thing off but I can't see how to do it with Project Reactor.

My problem is that I need to correlate requests with responses. The response time varies wildly and some will even never get a reply and timeout. When on the client side a blocking thread per request isn't a problem but since this is a server application I don't want to end up with spawning a thread per request that blocks waiting for a response.

The API looks something like this:

Mono<Response> doRequest(Mono<Request> request);

Since I don't know how to do it with Reactor I will explain how to do it with a CompletableFuture to clarify what I'm looking for. The API would look like this:

CompletableFuture<Response> doRequest(Request request);

When invoked by a caller a request to a server is made which has a correlation ID in it generated by this method. The caller is returned a CompletableFuture and the method stores a reference to this CompletableFuture in map with the correlation ID as key.

There is also a thread (pool) which receives all the responses for the server. When it receives a response it takes the correlation ID from the response and uses it to look up the original request (ie. the CompletableFuture) in the map and calls complete(response); on it.

In this implementation you don't need a blocking thread per request. This is basically more of a Vert.X / Netty way of thinking? I would like to know how to implement such a thing (if possible) with Project Reactor.

EDIT 25-07-2019:

As per request in the comments to clarify what I'm getting at below is an example of how I would implement this with CompleteableFuture's.

I also noticed I made a mistake which might have been rather confusing: In the CompletableFuture example I passed a Mono as argument. That should have been just a "normal" argument. My apologies and I hope I didn't confuse people too much with it.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

class NonBlockingCorrelatingExample {

    /**
     * This example shows how to implement correlating requests with responses without needing a (sleeping)
     * thread per request to wait for the response with the use of {@link CompletableFuture}'s.
     *
     * So the main feat of this example is that there is always a fixed (small) number of threads used even if one
     * would fire a thousands requests.
     */
    public static void main(String[] args) throws Exception {

        RequestResponseService requestResponseService = new RequestResponseService();

        Request request = new Request();
        request.correlationId = 1;
        request.question = "Do you speak Spanish?";

        CompletableFuture<Response> responseFuture = requestResponseService.doRequest(request);
        responseFuture.whenComplete((response, throwable) -> System.out.println(response.answer));

        // The blocking call here is just so the application doesn't exit until the demo is completed.
        responseFuture.get();
    }

    static class RequestResponseService {

        /** The key in this map is the correlation ID. */
        private final ConcurrentHashMap<Long, CompletableFuture<Response>> responses =  new ConcurrentHashMap<>();

        CompletableFuture<Response> doRequest(Request request) {
            Response response = new Response();
            response.correlationId = request.correlationId;
            CompletableFuture<Response> reponseFuture = new CompletableFuture<>();
            responses.put(response.correlationId, reponseFuture);

            doNonBlockingFireAndForgetRequest(request);

            return reponseFuture;
        }

        private void doNonBlockingFireAndForgetRequest(Request request) {
            // In my case this is where the request would be published on an MQTT broker (message bus) in a request topic.
            // Right now we will just make a call which will simulate a response message coming in after a while.
            simulateResponses();
        }

        private void processResponse(Response response) {
            // There would usually be a (small) thread pool which is subscribed to the message bus which receives messages
            // in a response topic and calls this method to handle those messages.
            CompletableFuture<Response> responseFuture = responses.get(response.correlationId);
            responseFuture.complete(response);
        }

        void simulateResponses() {
            // This is just to make the example work. Not part of the example.
            new Thread(() -> {
                try {
                    // Simulate a delay.
                    Thread.sleep(10_000);

                    Response response = new Response();
                    response.correlationId = 1;
                    response.answer = "Si!";

                    processResponse(response);

                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    static class Request {
        long correlationId;
        String question;
    }

    static class Response {
        long correlationId;
        String answer;
    }

}

Upvotes: 5

Views: 2694

Answers (1)

Alexander Pankin
Alexander Pankin

Reputation: 3955

Yes, it is possible. You can use reactor.core.publisher.Mono#create method to achieve it

For your example:

public static void main(String[] args) throws Exception {
    RequestResponseService requestResponseService = new RequestResponseService();

    Request request = new Request();
    request.correlationId = 1;
    request.question = "Do you speak Spanish?";


    Mono<Request> requestMono = Mono.just(request)
            .doOnNext(rq -> System.out.println(rq.question));
    requestResponseService.doRequest(requestMono)
            .doOnNext(response -> System.out.println(response.answer))
            // The blocking call here is just so the application doesn't exit until the demo is completed.
            .block();
}

static class RequestResponseService {
    private final ConcurrentHashMap<Long, Consumer<Response>> responses =
            new ConcurrentHashMap<>();

    Mono<Response> doRequest(Mono<Request> request) {
        return request.flatMap(rq -> doNonBlockingFireAndForgetRequest(rq)
                .then(Mono.create(sink -> responses.put(rq.correlationId, sink::success))));
    }

    private Mono<Void> doNonBlockingFireAndForgetRequest(Request request) {
        return Mono.fromRunnable(this::simulateResponses);
    }

    private void processResponse(Response response) {
        responses.get(response.correlationId).accept(response);
    }

    void simulateResponses() {
        // This is just to make the example work. Not part of the example.
        new Thread(() -> {
            try {
                // Simulate a delay.
                Thread.sleep(10_000);

                Response response = new Response();
                response.correlationId = 1;
                response.answer = "Si!";

                processResponse(response);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

Upvotes: 4

Related Questions