Reputation: 77
I have an endpoint which pushes data to kafka. Now, I want to respond to the call with the appropiriate status code 2xx or 5xx in case of kafka write success or failure respectively. The code snippet is
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(Message.of(price)
.withAck(() -> {
// Called when the message is acked
return CompletableFuture.completedFuture(null);
})
.withNack(throwable -> {
// Called when the message is nacked
return CompletableFuture.completedFuture(null);
}));
// return appropriate response
return Response
}
}
Now the issue is that the endpoint is responding with the status code before ack or nack callback is executed.
Also tried the sendAndAwait
method of MutinyEmitter
but that method returns void. So there is no way to know if the message is acked or nacked.
Upvotes: 4
Views: 1645
Reputation: 1154
The best way here is to chain async operations, like this:
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Uni<Response> addPrice(Double price) {
return Uni.createFrom().completionStage(priceEmitter.send(price))
.onItem().transform(ignored -> Response.ok().entity("foo").build())
.onFailure().recoverWithItem(Response.serverError().build());
}
If you want to work with sync code (I do not recommend):
@Blocking
@POST
@Consumes(MediaType.TEXT_PLAIN)
public Response addPrice(Double price) {
try {
Uni.createFrom().completionStage(priceEmitter.send(price))
.await().indefinitely();
return Response.ok().entity("foo").build();
} catch (Exception e) {
return Response.serverError().build();
}
}
The .await().indefinitely() will throw an exception in case of the Uni emitting a failure.
You also have the option to work directly with the CompletionStage returned by the emitter without converting it to a Uni, but remember Quarkus chose Mutiny as its default reactive framework.
Upvotes: 4