learner
learner

Reputation: 950

Connection to RabbitMQ Durable Queue fails over AMQP 1.0 protocol - Protonj2

Found Solution:- See solution at the end of this question.

I am trying to connect my AMQP 1.0 consumer (using Apache ProtonJ2 library). But my connection fails with following error

org.apache.qpid.protonj2.client.exceptions.ClientSessionRemotelyClosedException: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue_durable' in vhost '/': received 'false' but current is 'true' [condition = amqp:precondition-failed]

Following is my sample code.

public void connectAmqp() throws Throwable {
    final String serverHost = "localhost";
    final int serverPort = 5672;
    final String address = "test_queue_durable";
    final Client client = Client.create();

    final ConnectionOptions options = new ConnectionOptions().user("admin").password("admin");

    try{
        Connection connection = client.connect(serverHost, serverPort, options);
        
        Receiver receiver = connection.openReceiver(address);
        
        for (int i = 0; i < 100; ++i) {
            Delivery delivery = receiver.receive();
            System.out.println(delivery.message().body().getClass());
            System.out.println("*-*-*-*          " + new String((byte[])delivery.message().body()));
        }
    }catch (Exception e) {
        e.printStackTrace();
    }
}

Important points to Note:

  1. Queue is pre declared in RabbitMQ
  2. Queue is configured to be durable to prevent message loss
  3. Consumer connects successfully if durable property of queue is removed (but this is not intended)
  4. AMQP 1.0 protocol is required to connect
  5. Client library used to connect is Apache Qpid Proton J2.

Edited Resolution

    public void connectAmqp() throws Throwable {
    final String serverHost = "localhost";
    final int serverPort = 5672;
    final String address = "test_queue_durable";
    final Client client = Client.create();

    try{
        ConnectionOptions options = new ConnectionOptions().user("user").password("pa$$w0rd");
        Connection connection = client.connect(serverHost, serverPort, options);
        /**
         * Consumer Connecting to Durable Queue Connections
         * Set durable property on source link of receiveroptions
         */
        ReceiverOptions ro = new ReceiverOptions();
        ro.sourceOptions().durabilityMode(DurabilityMode.CONFIGURATION);
        
        Receiver receiver = connection.openReceiver(address, ro);
        
        for (int i = 0; i < 100; ++i) {
            Delivery delivery = receiver.receive();
            System.out.println(delivery.message().body().getClass());
            System.out.println("*-*-*-*          " + new String((byte[])delivery.message().body()));
        }
    }catch (Exception e) {
        e.printStackTrace();
    }
}

Upvotes: 0

Views: 399

Answers (2)

ciaranj
ciaranj

Reputation: 517

I ran into the same issue, taking into account your question point 1 'queue is predeclared in RabbitMQ', the specific fix for me was to add the prefix "/amq/queue/" to my existing queue names, this avoided the re-declaration mismatch that I think is being worked around in the current accepted answer.

So your address in the sample code should be "/amq/queue/test_queue_durable".

(This mattered for me because I wanted to use Quorum Queues, not just durable classic queues which introduced another problem.)

Upvotes: 0

Tim Bish
Tim Bish

Reputation: 18356

Likely you need to configure the Receiver Source values to match the Queue that you have created in your broker such that it allows the receiver to attach.

You'd need to do something like this (with configuration that satisfies the RabbitMQ attach prerequisites):

ReceiverOptions receiverOptions = new ReceiverOptions();
receiverOptions.sourceOptions().durabilityMode(DurabilityMode.CONFIGURATION);

Receiver receiver = session.openReceiver(address, receiverOptions);

Upvotes: 1

Related Questions