rightjoin
rightjoin

Reputation: 77

Quarkus - Responding with status based on kafka write ack and nack

I have an endpoint which pushes data to kafka. Now, I want to respond to the call with the appropiriate status code 2xx or 5xx in case of kafka write success or failure respectively. The code snippet is

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(Message.of(price)
            .withAck(() -> {
                // Called when the message is acked
                return CompletableFuture.completedFuture(null);
            })
            .withNack(throwable -> {
                // Called when the message is nacked
                return CompletableFuture.completedFuture(null);
            }));
         // return appropriate response
          return Response
    }
}

Now the issue is that the endpoint is responding with the status code before ack or nack callback is executed. Also tried the sendAndAwait method of MutinyEmitter but that method returns void. So there is no way to know if the message is acked or nacked.

Upvotes: 4

Views: 1645

Answers (1)

Vinicius
Vinicius

Reputation: 1154

The best way here is to chain async operations, like this:

@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
    return Uni.createFrom().completionStage(priceEmitter.send(price))
            .onItem().transform(ignored -> Response.ok().entity("foo").build())
            .onFailure().recoverWithItem(Response.serverError().build());
}

If you want to work with sync code (I do not recommend):

@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
    try {
        Uni.createFrom().completionStage(priceEmitter.send(price))
                .await().indefinitely();

        return Response.ok().entity("foo").build();
    } catch (Exception e) {
        return Response.serverError().build();
    }
}

The .await().indefinitely() will throw an exception in case of the Uni emitting a failure.

You also have the option to work directly with the CompletionStage returned by the emitter without converting it to a Uni, but remember Quarkus chose Mutiny as its default reactive framework.

Upvotes: 4

Related Questions