Kane
Kane

Reputation: 8172

How to properly handle an amqp msg then send it to another queue in Spring Integratin DSL

I want to process an AMQP message then send it to another queue for further processing.

I'm using Spring integration DSL to archive it as below,

@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
                           OCRService ocrService) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
            .concurrentConsumers(2))
            .transform(new JsonToObjectTransformer(Note.class))
            .handle(msg -> {
                // doing ocr here
                amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
            })
            .get();
}

@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
    return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
            .concurrentConsumers(4))
            .transform(new JsonToObjectTransformer(Note.class))
            .handle(msg -> {
                // doing some work for type processing
            }).get();
}

However I found that the message in NOTE_INCOMING_QUEUE queue still unacked when the new message is handling in type process phase. See below screenshot of rabbitmq management.

enter image description here

I'm wondering why the message in NOTE_INCOMING_QUEUE still unacked even though the handler already was executed successfully. Is it the design of spring integration amqp or something wrong in my code?

Upvotes: 0

Views: 202

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Use Amqp.inboundAdapter() instead of the gateway - the gateway is waiting for a reply that will never arrive.

Gateways are for request/reply scenarios; channel adapters are for one-way scenarios.

Upvotes: 1

Related Questions