Rinat Mukhamedgaliev
Rinat Mukhamedgaliev

Reputation: 5721

RabbitMQ subscribe

I use RabbitMQ for connection between parts my program. Version of RMQ(3.3.5). It used with java client from repo.

// Connection part
@Inject
public AMQService(RabbitMQConfig mqConfig) throws IOException {
    this.mqConfig = mqConfig;
    connectionFactory.setHost(mqConfig.getRABBIT_HOST());
    connectionFactory.setUsername(mqConfig.getRABBIT_USERNAME());
    connectionFactory.setPassword(mqConfig.getRABBIT_PASSWORD());
    connectionFactory.setAutomaticRecoveryEnabled(true);
    connectionFactory.setPort(mqConfig.getRABBIT_PORT());
    connectionFactory.setVirtualHost(mqConfig.getRABBIT_VHOST());
    Connection connection = connectionFactory.newConnection();
    channel = connection.createChannel();
    channel.basicQos(1);
}

//Consume part
private static void consumeResultQueue() {
    final QueueingConsumer consumer = new QueueingConsumer(channel);
    Future resultQueue = EXECUTOR_SERVICE.submit((Callable<Object>) () -> {
        channel.basicConsume("resultQueue", true, consumer);
        while (true) {
            try {
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody(), "UTF-8");
                resultListener.onMessage(message);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}

I want leave use inifinty loop. Can RMQ notify client while message can read from queue? Without check?

Upvotes: 1

Views: 335

Answers (1)

archz
archz

Reputation: 1072

You can create a class which extends DefaultConsumer and override handleDelivery.

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                           AMQP.BasicProperties properties, byte[] body) throws IOException {
        // do your computation
    }
}

And register this consumer with channel.basicConsume(queueName, myConsumerInstance);

Note that by doing this, handleDelivery will run inside rabbitmq client thread pool so you should avoid any long computation inside this function.

Upvotes: 2

Related Questions