crawlero
crawlero

Reputation: 45

java how to handle callbacks of rabbitmq messages

I want to create a server to handle socket connections from users, and inside my server I want to have a connection to a RabbitMQ, one per connection, but in the examples provided in their webpage I see only "while" loops to wait for the message, in this case I will need to create a thread per connection only to process the message from RabbitMQ.

Is there a way to do this in Java using Spring or any framework that I just create the call back for the RabbitMQ instead of using while loops?

I was using node.js and there it is pretty straightforward to do this, and I want to know some proposals for Java

Upvotes: 1

Views: 3808

Answers (1)

DaveC
DaveC

Reputation: 364

You should take a look at the Channel.basicConsume and the DefaultConsumer abstract class: https://www.rabbitmq.com/api-guide.html#consuming

Java concurrency will require a thread for the callback to handle each message, but you can use a thread pool to reuse threads.

static final ExecutorService threadPool;

static {
    threadPool = Executors.newCachedThreadPool();
}

Now you need to create a consumer that will handle each delivery by creating a Runnable instance that will be passed to the thread pool to execute.

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable
        final long msgTag = envelope.getDeliveryTag();
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // handle the message here
                doStuff(msgBody);
                channel.basicAck(msgTag, false);
            }
        };
        threadPool.submit(runnable);
    }
});

This shows how you can handle concurrent deliveries on a single connection and channel without a while loop in a single thread that would be blocked on each delivery. For your sanity, you probably will want to factor your Runnable implementation into its own class that could accept the channel, msgBody, msgTag and any other data as parameters that will be accessible when the run() method is called.

Upvotes: 1

Related Questions