Fulo Lin
Fulo Lin

Reputation: 55

Rabbitmq cluster, when the node is offline, how to ensure that the offline queue receives messages from the fanout exchange?

I have three node cluster, and two queues bound to one fanout exchange. My requirement is that all messages sent to the exchange must be saved to these two queues, all messages cannot be lost and must be processed.

When one of the nodes is offline, the queue on the node will lose the message received by the exchange. I may be able to use Quorum Queues, but this can only allow one node to go offline. If two nodes are offline, the same problem will occur. Is there any solution?

Update: (2023-10-03)

I want to describe my problem in detail, queue-A and queue-B are bound to MyFanoutExchange, and queue-A is located at node rabbitmq-1, queue-B is located at node rabbitmq-2. Now, all messages sent to MyFanoutExchange will go to queue-A and queue-B, which is good.

Exchange Status Nodes Queues
MyFanoutExchange online rabbitmq-0
online rabbitmq-1 queue-A
online rabbitmq-2 queue-B

When node rabbitmq-1 is offline, all messages sent to MyFanoutExchange will go to queue-B but not queue-A, even if rabbitmq-1 comes online later.

Exchange Status Nodes Queues
MyFanoutExchange online rabbitmq-0
offline rabbitmq-1 queue-A
online rabbitmq-2 queue-B

I need to ensure that queue-A and queue-B can receive all messages from MyFanoutExchange.

Upvotes: 1

Views: 838

Answers (1)

Chito Grito
Chito Grito

Reputation: 11

You can just use one direct queue. Then, your 3 nodes will process messages one by one, and that will be faster. Also, you can create a recerve queue for messages with mistakes using these headers:

async function initBaseQueues(
  ch: amqplib.Channel,
  hash: Record<string, string>,
) {
  const queues = Object.keys(hash);

  await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE, 'direct');

  for (let i = 0; i < queues.length; i++) {
    const queue = queues[i];

    const routingKey = hash[queue];

    await ch.assertQueue(queue, {
      autoDelete: false,
      durable: true,
      arguments: {
        [ERabbitQueueArguments.deadLetterExchange]:
          FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
        [ERabbitQueueArguments.deadLetterRoutingKey]: routingKey + RETRY_CONST,
      },
    });

    await ch.bindQueue(queue, FACTORY_GATEWAY_EXCHANGE, routingKey);

    Logger.log(`Queue ${queue} initialized`);
  }
}

async function initRetryQueues(
  ch: amqplib.Channel,
  hash: Record<string, string>,
) {
  const queues = Object.keys(hash);

  await ch.assertExchange(FACTORY_GATEWAY_EXCHANGE + RETRY_CONST, 'direct');

  for (let i = 0; i < queues.length; i++) {
    const queue = queues[i];

    const routingKey = hash[queue];

    await ch.assertQueue(queue + RETRY_CONST, {
      autoDelete: false,
      durable: true,
      arguments: {
        [ERabbitQueueArguments.deadLetterExchange]: FACTORY_GATEWAY_EXCHANGE,
        [ERabbitQueueArguments.deadLetterRoutingKey]: routingKey,
        [ERabbitQueueArguments.messageTTL]: REQUEUE_DEALAY_CONST,
      },
    });

    await ch.bindQueue(
      queue + RETRY_CONST,
      FACTORY_GATEWAY_EXCHANGE + RETRY_CONST,
      routingKey + RETRY_CONST,
    );

    Logger.log(`Queue ${queue + RETRY_CONST} initialized`);
  }
}

export default async function initRabbitQueues() {
  const conn = await amqplib.connect(
    process.env.RABBIT_HOST ?? 'amqp://localhost:5672',
  );

  const ch1 = await conn.createChannel();

  const hash = queueNames();

  await initBaseQueues(ch1, hash);

  await initRetryQueues(ch1, hash);

  await conn.close();
}

The headers are:

export enum ERabbitQueueArguments {
  deadLetterExchange = 'x-dead-letter-exchange',
  deadLetterRoutingKey = 'x-dead-letter-routing-key',
  messageTTL = 'x-message-ttl',
}

Upvotes: 1

Related Questions