Erica Zhang
Erica Zhang

Reputation: 23

Spring AMQP RabbitMQ how to make two parallel consumers will not grab same task at the same time?

I have two systems are integrated with RabbitMQ.


BackGround

Client send multiple request messages from Spring-AMQP outbound-Gateway to RabbitMQExchange, the rabbitmq-DirectExchange will use round-robin dispatching those messages to Multiple Workers(Those workers are independent located on different desktops which will parallel running same worker code for processing different messages from RabbitExchange by use simpleMessageListner.


Logic Flow

Similiar with Rabbitmq Tutorial multiWorker-DirectExchange.

Client-----sendRequests (5tasks) to ---->RabbitMQ-DirectExchange

then Rabbitmq-DirectExchange distribute those 5 tasks to workers PC1 ( Worker1 ), PC2 ( Worker2 )


ExchangeType & my Bindings

<!-- rabbit connection factory, rabbit template, and rabbit admin -->
 <rabbit:connection-factory
         id="connectionFactory"
         host="local IP address"
         username="guest"
         password="guest"
         channel-cache-size="10"   /> 

 <rabbit:template id="amqpTemplate" 
                  connection-factory="connectionFactory"
                  reply-timeout="600000" 
                  exchange="JobRequestDirectExchange"/>

 <rabbit:admin connection-factory="connectionFactory" id="rabbitAdmin" />

 <rabbit:direct-exchange name="taskRequests" 
                         auto-delete="false" 
                         durable="true"  >
     <rabbit:bindings>
        <rabbit:binding queue="jobRequests" key="request.doTask"   />
     </rabbit:bindings>

 </rabbit:direct-exchange>

 <rabbit:queue name="jobRequests" auto-delete="false" durable="true" />

Worker-The consumer Configuration

<rabbit:listener-container  id="workerContainer" 
        acknowledge="auto"
        prefetch="1"                         
        connection-factory="connectionFactory">
    <rabbit:listener ref="taskWorker" queue-names="jobRequests" /> 
</rabbit:listener-container>    

The Worker Class is simple POJO who will process the request and complete task.


Use: RabbitMQ 3.2.2 with Spring-Integration-Amqp 2.2


What I expect

I expect that Worker1 can receive some of tasks while Worker2 can pick the rest of tasks ( the other tasks ).

I wish workers can parallel together do whole 5 tasks. Each time each worker only do one task, after finish will be distribute another tasks one by one. (rabbit-listner has been set to prefetch=1)

Such as

worker1: t2 t3 t5

worker2: t1 t4

But

After lots of runtime-test, sometime it do the task correctly.

Worker1------task4 task1

Worker2------task3 task2 task5

While sometime it do the wrong way like this:

Worker1------task4 task1

Worker2------task4 task2 task1

Aparently, the task4 and task1 are be picked by worker1 and worker2 sametime.


Runtime test:

I checked that the client correctly send out task1 task2 task3 task4 task5 request message to RabbitExchange. But everytime each worker receive different tasks. There is a common case that may trigger wrong dispathcing.

There are 5tasks (t1,t2,t3,t4,t5) at RabbitmqExchange, and they will be send to 2 parallel workers (w1,w2).

w1 got tasks: t2 t1 t4

w2 got tasks: t3 t1

As Round-Robin dispatch method, w1 and w2 in sequence got tasks.

w1 got t2 and w2 got t3.

While t2 and t3 running, RabbitmqExchange send t1 to w1 and wait for ack from w1.

Suppose t2 spend more time to finish task than t3 does and w2 is free when w1 doing t1.

w2 finish t3 task will receive RabbitmqExchange dispatched t1 because w2 is not busy and RabbitExchange did not receive t1 finished task ack message.

My understanding is

Both w1 and w2 are doing same task t1. Either one of them once finish t1 will send back ack to RabbitmqExchange, then RabbitmqExchange will dequeue one task message. As t1 has been finished twice, RabbitmqExchange dequeue one more message that it should. So in this way t5 message has been dequeued because t1 has been done twice. Although 5 messages in RabbitmqExchange are acked and dequeue finish. But two workers missing do t5 and do t1 twice.


What should I do that can prevent two parallel workers grab the same message from a same Rabbit queue?

I tried auto-ack way, the message are correctly acked. But during the time of server wait for worker's ack, rabbitmq may redispatch the message which is not-acked but already been distributed to another worker.

Also thinking about synchronized the sent out messages or give priority to sent out messages. But do not have clear vision how to accomplish.

I am grateful to hear any ideas about this problem.Thanks

Upvotes: 2

Views: 10691

Answers (3)

tbg
tbg

Reputation: 151

Did you try setting concurrentConsumers property on the listener container as discussed here?

Upvotes: 0

Erica Zhang
Erica Zhang

Reputation: 23

I tried long time to work out SpringConfigured-way to implement this feature but failed.

While I come out the workable solution using RabbitMQ Java Client API.

Using Spring-Asynchronous Gateway with QuartzScheduler, it always have problem send message as needed. I guess it has reason for multi-threads sort of.

At the beginning, I thought it because of that the Channel instance may accessed concurrently by multiple threads. In this way the confirms are not handled properly.

An important caveat to this is that confirms are not handled properly when a Channel is shared between multiple threads. In that scenario, it is therefore important to ensure that the Channel instance is not accessed concurrently by multiple threads.

Above from http://www.rabbitmq.com/javadoc/com/rabbitmq/client/Channel.html

Finally, I decide give up use Spring-way and change back to use RabbitMQ API(Before I use Spring XML configure the gateway/channels, now use RabbitMQ-JavaClient java programming way declare exchange with channels.). And add usage of RabbitMQRPC for asynchronous callback. Now everything works fine for current requirement.


So in summary, the final solution for my requirement is:

  • Use RabbitMQ JAVAClient API to declare exchange/channels/binding/routingKey. For both client and server side.

  • Use RabbitMQ RPC for implement asynchronous callback feature.

(I follow RabbitMQ's java tutorial, use this link: http://www.rabbitmq.com/tutorials/tutorial-six-java.html)

Upvotes: 0

cvbarros
cvbarros

Reputation: 1694

One thing I can think that is causing this duplicated messages for your consumers is when a consumer closes the channel before sending an ack message.

In that case, the RabbitMQ broker will requeue the message and set it's redelivered flag to true. From RabbitMQ docs:

If a message is delivered to a consumer and then requeued (because it was not acknowledged before the consumer connection dropped, for example) then RabbitMQ will set the redelivered flag on it when it is delivered again (whether to the same consumer or a different one). This is a hint that a consumer may have seen this message before (although that's not guaranteed, the message may have made it out of the broker but not into a consumer before the connection dropped). Conversely if the redelivered flag is not set then it is guaranteed that the message has not been seen before. Therefore if a consumer finds it more expensive to deduplicate messages or process them in an idempotent manner, it can do this only for messages with the redelivered flag set.

If when you are testing you close one of the worker processes before sending an ack, or in case they fault, this is very likely to happen. You can try to examine the redelivered flag in order to avoid it to be processed again by a different consumer, if that is the case.

Another thing I've noticed is the prefetch setting in your consumer configuration. You should set this to a higher value (tune it for your needs) instead of leaving it at just 1. You can learn more about prefetch here.

Hope that helps!

Upvotes: 1

Related Questions