MoLt1eS
MoLt1eS

Reputation: 46

Quarkus AMQP send message to queue after request business logic

Once I receive a HTTP Get/Post I have to persist and object and then send a message to a queue where other services are listening to start doing other complex work

My current issue is that I can't just call a method with an @Outgoing("channel") annotation, I tried that and just keeps on executing the method without calling

Is there a way to call a method to send a JSON payload to a queue using the Quarkus framework?

PS: Im also trying to use rabbitMQ and switched back to ActiveMQ

Ive followed the Quarkus tuturial on reactive messaging and tried to register something on in implemented resource, but no luck

@Path("/part")
class PartService : PanacheRepository<PartDao>, Logging {

    @GET
    @Produces(MediaType.APPLICATION_JSON)
    @Transactional
    fun fetchParts(): List<PartDao> {

        val partDao = PartDao(label = "Test", status = PartStatus.INBANK, creatorId = "ghost-007")

        partDao.persist()

        if (partDao.isPersistent) {
            // Send a message to a queue -> PoC
            send(partDao)
        }

        return findAll().list()
    }

    @Outgoing("part-persisted")
    @Transactional
    fun send(partDao: PartDao): CompletionStage<AmqpMessage<*>> {
        val future = CompletableFuture<AmqpMessage<*>>()
        val message = "hello from sender"

        // Debug proposes
        println("Sending (data): $message")
        logger.debug(partDao.toString())

        future.complete(AmqpMessage(message))
        return future
    }

}

Expected:

Register message "hello from sender" in queue after doing:

curl http://localhost/part

Actual results:

send method just keeps on executing

Upvotes: 0

Views: 1849

Answers (1)

Michał Szynkiewicz
Michał Szynkiewicz

Reputation: 797

If I understand correctly, you want to call a method that would put something into a stream.

To my knowledge, you have to use an Emitter to do it, see e.g. https://github.com/michalszynkiewicz/devoxxpl-demo/blob/master/search/src/main/java/com/example/search/SearchEndpoint.java#L23

See https://smallrye.io/smallrye-reactive-messaging/#_stream documentation.

Upvotes: 1

Related Questions