Reputation: 8172
I want to process an AMQP message then send it to another queue for further processing.
I'm using Spring integration DSL to archive it as below,
@Bean
public IntegrationFlow ocr(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
OCRService ocrService) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_INCOMING_QUEUE)
.concurrentConsumers(2))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing ocr here
amqpTemplate.convertAndSend(NOTE_EXCHANGE, NOTE_OCRED_BINDING, note);
})
.get();
}
@Bean
public IntegrationFlow typeProcess(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, NOTE_OCRED_QUEUE)
.concurrentConsumers(4))
.transform(new JsonToObjectTransformer(Note.class))
.handle(msg -> {
// doing some work for type processing
}).get();
}
However I found that the message in NOTE_INCOMING_QUEUE queue still unacked when the new message is handling in type process phase. See below screenshot of rabbitmq management.
I'm wondering why the message in NOTE_INCOMING_QUEUE still unacked even though the handler already was executed successfully. Is it the design of spring integration amqp or something wrong in my code?
Upvotes: 0
Views: 202
Reputation: 174554
Use Amqp.inboundAdapter()
instead of the gateway - the gateway is waiting for a reply that will never arrive.
Gateways are for request/reply scenarios; channel adapters are for one-way scenarios.
Upvotes: 1