Reputation: 730
My problem is, that I have a quarkus REST webservice which executes some tasks in a blocking/sequential way and then I want to execute a longer running task none blocking, but I found no running solution.
So the webservice looks like that (simplyfied!:
import io.vertx.mutiny.core.eventbus.EventBus;
@Path("/booking")
@ApplicationScoped
public class BookingResource {
@Inject
EventBus eventBus;
@POST
@Path("/{bookingId}/print-tickets/")
@Produces(MediaType.APPLICATION_JSON)
public PdfTicket printTickets(@PathParam("bookingId") String bookingId) throws Exception {
var optBooking = daoBooking.getBookingDetailsById(bookingId);
//....
var eventOpt = daoEvent.getEventById(booking.getEventId());
//...
PdfTicket pdfTicket = myconverter(optBooking, eventOpt); //Some magic code which I removed here
//this is the code which should run in background and the REST call should NOT wait vor the result
if (booking.hasFixedSeatingTickets()) {
// Mark seats as printed, so the fixed seats are not re-assigned to other guests
eventBus.requestAndForget("greeting", booking.getBookingId());
}
return pdfTicket;
}
}
I tried simple java threads, I tried Mutinity and lastly I tried the quarkus eventbus approach which you can see in the code above. So here I am sending a message to "greeting". I consume the event like that:
import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;
@ApplicationScoped
public class TestResource {
private static final Logger LOG = Logger.getLogger(TestResource.class);
@Inject
private DaoBooking daoBooking;
@Inject
ManagedExecutor executor;
@ConsumeEvent("greeting")
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
daoBooking.markSeatsAsPrinted(bookingId);
} catch (FileMakerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor);
}
}
According to https://quarkus.io/guides/reactive-event-bus when the method returns void
the calling service should NOT wait for the result.
Now, when I call my REST service, I can see in the log "Received markSeatsAsPrinted event" but never "Start long running mark tickets as printed job". So the code in the Uni block is just not executed.
When I change my code to return a Uni<Boolean>
like that:
@ConsumeEvent("greeting")
public Uni<Boolean> markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");;
return Uni.createFrom().item(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
daoBooking.markSeatsAsPrinted(bookingId);
} catch (FileMakerException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
return true;
}).emitOn(executor);
}
then the logfile does display "Start long running mark tickets as printed job" but now, also my main REST service blocks until the event is fully processed (which takes 10 seconds, so I have to do it in the background).
What is wrong here? Also, I had the same behaviour, when I did not use quarkus event bus but the Uni/Executor stuff in a simple method. Also, using Java Threads did not work because quarkus waited until the thread was finished with the REST response.
At the end of the day I just want to have that task which takes so much time running in background without waiting for the answer in the REST call.
Please help :-)
Thank you, schube
Upvotes: 1
Views: 2670
Reputation: 3306
There are several mistakes in the code.
Always use subscription
Mutiny as a reative and event driven library is using lazy evaluated events and subscriptions to run asynchronous code. That means, any operation without subscription will never run.
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor).subscribe().with(
item -> LOG.info("Finished"),
failure -> LOG.error("Failed with " + failure)
);
NOTE: other methods are available e.q. onItem()
, onFailure()
and so on.
The output is:
2023-06-01 15:11:58,545 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) Received markSeatsAsPrinted event
2023-06-01 15:11:58,548 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) Start long running mark tickets as printed job
2023-06-01 15:11:58,549 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) marked as printed
2023-06-01 15:11:58,549 INFO [com.exa.TestRunner] (vert.x-eventloop-thread-9) End long running mark tickets as printed job
2023-06-01 15:11:58,551 INFO [com.exa.TestRunner] (executor-thread-1) Finished
Never block the event loop
As you can see the previous operation was executed on event-loop
thread pool. As a golden rule you should never block the event loop. When you have to run some blocking operations (e.g database modifications) or other long running tasks it has to be propagated to worker threads. Using @Blocking
annotation on the method Quarkus will run that on worker thread.
@ConsumeEvent("greeting")
@Blocking
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
Uni.createFrom().voidItem().invoke(() -> {
LOG.info("Start long running mark tickets as printed job");
try {
printer.doSome(bookingId);
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
}).emitOn(executor).
subscribe().with(
item -> LOG.info("Finished"),
failure -> LOG.error("Failed with " + failure)
);
}
Now, the log output is:
2023-06-01 15:29:08,177 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:29:08,179 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) Start long running mark tickets as printed job
2023-06-01 15:29:08,180 INFO [com.exa.SomePrinter] (vert.x-worker-thread-1) Printing
2023-06-01 15:29:08,181 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) marked as printed
2023-06-01 15:29:08,182 INFO [com.exa.TestRunner] (vert.x-worker-thread-1) End long running mark tickets as printed job
2023-06-01 15:29:08,183 INFO [com.exa.TestRunner] (executor-thread-1) Finished
Suggestion: Use custom worker instead of event bus
As I understood the business requirement by the code I don't think eventbus is the perfect solution. Probably I'm wrong, but I prefer to create separated worker for the pdf generation.
Here is a ground work.
@Singleton
@Startup
public class CustomWorker {
private static final Logger LOG = Logger.getLogger(CustomWorker.class);
private final WorkerExecutor executor;
CustomWorker(Vertx vertx) {
executor = vertx.createSharedWorkerExecutor("pdf-printer");
}
void tearDown(@Observes ShutdownEvent ev) {
executor.close();
}
public void markSeatsAsPrinted(String bookingId) {
LOG.info("Received markSeatsAsPrinted event");
executor.executeBlocking(promise -> {
LOG.info("Start long running mark tickets as printed job");
try {
// Block thread for 5 seconds
Thread.sleep(5000L);
LOG.info("marked as printed");
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("End long running mark tickets as printed job");
promise.complete();
});
}
}
@Path("/booking")
public class ExampleResource {
private static final Logger LOG = Logger.getLogger(ExampleResource.class);
@Inject
CustomWorker worker;
@GET
@Path("/{bookingId}/print-tickets/")
@Produces(MediaType.APPLICATION_JSON)
public PdfTicket printTickets(@PathParam("bookingId") String bookingId) {
LOG.info("Got request");
// I did't implement so much (e.g. DAO logic)
var booking = new Booking(bookingId);
//....
PdfTicket pdfTicket = new PdfTicket(bookingId);
//this is the code which should run in background and the REST call should NOT wait vor the result
if (booking.hasFixedSeatingTickets()) {
worker.markSeatsAsPrinted(booking.getBookingId());
}
LOG.info("Return response");
return pdfTicket;
}
}
The outout is:
2023-06-01 15:43:05,127 INFO [com.exa.ExampleResource] (executor-thread-1) Got request
2023-06-01 15:43:05,128 INFO [com.exa.CustomWorker] (executor-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:43:05,130 INFO [com.exa.ExampleResource] (executor-thread-1) Return response
2023-06-01 15:43:05,130 INFO [com.exa.CustomWorker] (pdf-printer-1) Start long running mark tickets as printed job
2023-06-01 15:43:10,131 INFO [com.exa.CustomWorker] (pdf-printer-1) marked as printed
2023-06-01 15:43:10,132 INFO [com.exa.CustomWorker] (pdf-printer-1) End long running mark tickets as printed job
Of course, WorkerExecutor
is customizable. You can also set poolSize
and maxExecutionTime
if it needed.
Upvotes: 3