Ryan Walls
Ryan Walls

Reputation: 7092

How to poll an AMQP queue using Spring Integration

I have the use case that I need to wait 2 hours before consuming messages from an AMQP (we use Rabbit) queue.

EDIT: To clarify my use case... I need each individual message to wait 2 hours before being read. E.g. Message 1 arrives at 10am and Message 2 arrives at 10:15. I need Message 1 to be read at 12p and Message 2 to be read at 12:15p.

We are using Spring Integration 3.x.

The int-amqp:inbound-channel-adapter is message driven and doesn't have a polling option from what I can find.

A couple things I've thought of:

Any suggestions?

Upvotes: 0

Views: 2200

Answers (3)

Ryan Walls
Ryan Walls

Reputation: 7092

FYI, how I solved the issue. (Used solution #3).

<rabbit:queue name="delayQueue" durable="true">
  <rabbit:queue-arguments>
    <entry key="x-message-ttl">
       <value type="java.lang.Long">7200000</
     </entry>
     <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
     <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
  </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:topic-exchange name="finalDestinationTopic">
  <rabbit:bindings>
    <rabbit:binding queue="finalDestinationQueue" pattern="finalDestinationQueue"/>
  </rabbit:bindings>
</rabbit:topic-exchange>

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121560

Another trick is about to use PollableAmqpChannel:

<int-amqp:channel id="myQueueName" message-driven="false"/>

and provide the <poller> for the subscriber to that channel.

There is no reason to send messages to that channel (because you will poll messages from Rabbit Queue) and, right, it looks like anti-pattern, but it is a hook how to avoid any workarounds with direct RabbitTemplate usage via SpEL.

UPDATE

<delayer> can help you, but it depends of your requirements. If you don't want to poll messages from RabbitMQ, you should use the workaround above. But if you just don't want to process message until some time is elapsed, you can just 'delay' it for that time.

Don't forget to add persistent message-store to avoid losing messages during that period and unexpected application failure.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174779

We don't currently have a polling inbound adapter. #1 is easy. For #2, the simplest would be to use a RabbitTemplate and invoke receive() from an inbound-channel-adapter in a POJO.

I would go with #1; you don't need quartz, you can use a simple Spring scheduled task and a control bus to start the adapter.

Upvotes: 1

Related Questions