Reputation: 45
I want to create a server to handle socket connections from users, and inside my server I want to have a connection to a RabbitMQ, one per connection, but in the examples provided in their webpage I see only "while" loops to wait for the message, in this case I will need to create a thread per connection only to process the message from RabbitMQ.
Is there a way to do this in Java using Spring or any framework that I just create the call back for the RabbitMQ instead of using while loops?
I was using node.js and there it is pretty straightforward to do this, and I want to know some proposals for Java
Upvotes: 1
Views: 3808
Reputation: 364
You should take a look at the Channel.basicConsume and the DefaultConsumer abstract class: https://www.rabbitmq.com/api-guide.html#consuming
Java concurrency will require a thread for the callback to handle each message, but you can use a thread pool to reuse threads.
static final ExecutorService threadPool;
static {
threadPool = Executors.newCachedThreadPool();
}
Now you need to create a consumer that will handle each delivery by creating a Runnable instance that will be passed to the thread pool to execute.
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
final byte msgBody = body; // a 'final' copy of the body that you can pass to the runnable
final long msgTag = envelope.getDeliveryTag();
Runnable runnable = new Runnable() {
@Override
public void run() {
// handle the message here
doStuff(msgBody);
channel.basicAck(msgTag, false);
}
};
threadPool.submit(runnable);
}
});
This shows how you can handle concurrent deliveries on a single connection and channel without a while loop in a single thread that would be blocked on each delivery. For your sanity, you probably will want to factor your Runnable
implementation into its own class that could accept the channel
, msgBody
, msgTag
and any other data as parameters that will be accessible when the run()
method is called.
Upvotes: 1