schube
schube

Reputation: 730

Execute background job with quarkus and don't wait for result

My problem is, that I have a quarkus REST webservice which executes some tasks in a blocking/sequential way and then I want to execute a longer running task none blocking, but I found no running solution.

So the webservice looks like that (simplyfied!:

import io.vertx.mutiny.core.eventbus.EventBus;

@Path("/booking")
@ApplicationScoped
public class BookingResource {

    @Inject
    EventBus eventBus;

    @POST
    @Path("/{bookingId}/print-tickets/")
    @Produces(MediaType.APPLICATION_JSON)
    public PdfTicket printTickets(@PathParam("bookingId") String bookingId) throws Exception {

        var optBooking = daoBooking.getBookingDetailsById(bookingId);
        //....
        var eventOpt = daoEvent.getEventById(booking.getEventId());
        //...
        PdfTicket pdfTicket = myconverter(optBooking, eventOpt); //Some magic code which I removed here
    
        //this is the code which should run in background and the REST call should NOT wait vor the result
        if (booking.hasFixedSeatingTickets()) {
            // Mark seats as printed, so the fixed seats are not re-assigned to other guests
            eventBus.requestAndForget("greeting", booking.getBookingId());
        }
        return pdfTicket;
    }
}

I tried simple java threads, I tried Mutinity and lastly I tried the quarkus eventbus approach which you can see in the code above. So here I am sending a message to "greeting". I consume the event like that:

import io.quarkus.vertx.ConsumeEvent;
import io.smallrye.mutiny.Uni;

@ApplicationScoped
public class TestResource {
    private static final Logger LOG = Logger.getLogger(TestResource.class);

    @Inject
    private DaoBooking daoBooking;

    @Inject
    ManagedExecutor executor;

    @ConsumeEvent("greeting")
    public void markSeatsAsPrinted(String bookingId) {
        LOG.info("Received markSeatsAsPrinted event");
        Uni.createFrom().voidItem().invoke(() -> {
            LOG.info("Start long running mark tickets as printed job");
            try {
                daoBooking.markSeatsAsPrinted(bookingId);
            } catch (FileMakerException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            LOG.info("End long running mark tickets as printed job");

        }).emitOn(executor);

    }

}

According to https://quarkus.io/guides/reactive-event-bus when the method returns void the calling service should NOT wait for the result.

Now, when I call my REST service, I can see in the log "Received markSeatsAsPrinted event" but never "Start long running mark tickets as printed job". So the code in the Uni block is just not executed.

When I change my code to return a Uni<Boolean> like that:

@ConsumeEvent("greeting")
public Uni<Boolean> markSeatsAsPrinted(String bookingId) {
    LOG.info("Received markSeatsAsPrinted event");;
    return Uni.createFrom().item(() -> {
        LOG.info("Start long running mark tickets as printed job");
        try {
            daoBooking.markSeatsAsPrinted(bookingId);
        } catch (FileMakerException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        LOG.info("End long running mark tickets as printed job");
        return true;

    }).emitOn(executor);

}

then the logfile does display "Start long running mark tickets as printed job" but now, also my main REST service blocks until the event is fully processed (which takes 10 seconds, so I have to do it in the background).

What is wrong here? Also, I had the same behaviour, when I did not use quarkus event bus but the Uni/Executor stuff in a simple method. Also, using Java Threads did not work because quarkus waited until the thread was finished with the REST response.

At the end of the day I just want to have that task which takes so much time running in background without waiting for the answer in the REST call.

Please help :-)

Thank you, schube

Upvotes: 1

Views: 2670

Answers (1)

zforgo
zforgo

Reputation: 3306

There are several mistakes in the code.

Always use subscription

Mutiny as a reative and event driven library is using lazy evaluated events and subscriptions to run asynchronous code. That means, any operation without subscription will never run.

Uni.createFrom().voidItem().invoke(() -> {
    LOG.info("Start long running mark tickets as printed job");
    try {
        LOG.info("marked as printed");
    } catch (Exception e) {
        e.printStackTrace();
    }
    LOG.info("End long running mark tickets as printed job");

}).emitOn(executor).subscribe().with(
        item -> LOG.info("Finished"),
        failure -> LOG.error("Failed with " + failure)
);

NOTE: other methods are available e.q. onItem(), onFailure() and so on.

The output is:

2023-06-01 15:11:58,545 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) Received markSeatsAsPrinted event
2023-06-01 15:11:58,548 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) Start long running mark tickets as printed job
2023-06-01 15:11:58,549 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) marked as printed
2023-06-01 15:11:58,549 INFO  [com.exa.TestRunner] (vert.x-eventloop-thread-9) End long running mark tickets as printed job
2023-06-01 15:11:58,551 INFO  [com.exa.TestRunner] (executor-thread-1) Finished

Never block the event loop

As you can see the previous operation was executed on event-loop thread pool. As a golden rule you should never block the event loop. When you have to run some blocking operations (e.g database modifications) or other long running tasks it has to be propagated to worker threads. Using @Blocking annotation on the method Quarkus will run that on worker thread.

@ConsumeEvent("greeting")
@Blocking
public void markSeatsAsPrinted(String bookingId) {
    LOG.info("Received markSeatsAsPrinted event");
    Uni.createFrom().voidItem().invoke(() -> {
                LOG.info("Start long running mark tickets as printed job");
                try {
                    printer.doSome(bookingId);
                    LOG.info("marked as printed");
                } catch (Exception e) {
                    e.printStackTrace();
                }
                LOG.info("End long running mark tickets as printed job");

            }).emitOn(executor).
            subscribe().with(
                    item -> LOG.info("Finished"),
                    failure -> LOG.error("Failed with " + failure)
            );
}

Now, the log output is:

2023-06-01 15:29:08,177 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:29:08,179 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) Start long running mark tickets as printed job
2023-06-01 15:29:08,180 INFO  [com.exa.SomePrinter] (vert.x-worker-thread-1) Printing
2023-06-01 15:29:08,181 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) marked as printed
2023-06-01 15:29:08,182 INFO  [com.exa.TestRunner] (vert.x-worker-thread-1) End long running mark tickets as printed job
2023-06-01 15:29:08,183 INFO  [com.exa.TestRunner] (executor-thread-1) Finished

Suggestion: Use custom worker instead of event bus

As I understood the business requirement by the code I don't think eventbus is the perfect solution. Probably I'm wrong, but I prefer to create separated worker for the pdf generation.

Here is a ground work.

@Singleton
@Startup
public class CustomWorker {

    private static final Logger LOG = Logger.getLogger(CustomWorker.class);

    private final WorkerExecutor executor;

    CustomWorker(Vertx vertx) {
        executor = vertx.createSharedWorkerExecutor("pdf-printer");
    }

    void tearDown(@Observes ShutdownEvent ev) {
        executor.close();
    }

    public void markSeatsAsPrinted(String bookingId) {

        LOG.info("Received markSeatsAsPrinted event");
        executor.executeBlocking(promise -> {
            LOG.info("Start long running mark tickets as printed job");
            try {
                // Block thread for 5 seconds
                Thread.sleep(5000L);
                LOG.info("marked as printed");
            } catch (Exception e) {
                e.printStackTrace();
            }
            LOG.info("End long running mark tickets as printed job");
            promise.complete();
        });
    }
}
@Path("/booking")
public class ExampleResource {

    private static final Logger LOG = Logger.getLogger(ExampleResource.class);

    @Inject
    CustomWorker worker;

    @GET
    @Path("/{bookingId}/print-tickets/")
    @Produces(MediaType.APPLICATION_JSON)
    public PdfTicket printTickets(@PathParam("bookingId") String bookingId) {

        LOG.info("Got request");

        // I did't implement so much (e.g. DAO logic)
        var booking = new Booking(bookingId);
        //....
        PdfTicket pdfTicket = new PdfTicket(bookingId);

        //this is the code which should run in background and the REST call should NOT wait vor the result
        if (booking.hasFixedSeatingTickets()) {
            worker.markSeatsAsPrinted(booking.getBookingId());
        }

        LOG.info("Return response");
        return pdfTicket;
    }
}

The outout is:

2023-06-01 15:43:05,127 INFO  [com.exa.ExampleResource] (executor-thread-1) Got request
2023-06-01 15:43:05,128 INFO  [com.exa.CustomWorker] (executor-thread-1) Received markSeatsAsPrinted event
2023-06-01 15:43:05,130 INFO  [com.exa.ExampleResource] (executor-thread-1) Return response
2023-06-01 15:43:05,130 INFO  [com.exa.CustomWorker] (pdf-printer-1) Start long running mark tickets as printed job
2023-06-01 15:43:10,131 INFO  [com.exa.CustomWorker] (pdf-printer-1) marked as printed
2023-06-01 15:43:10,132 INFO  [com.exa.CustomWorker] (pdf-printer-1) End long running mark tickets as printed job

Of course, WorkerExecutor is customizable. You can also set poolSize and maxExecutionTime if it needed.

Upvotes: 3

Related Questions