ComeOnWang
ComeOnWang

Reputation: 1

Orderly message send back %RETRY%CONSUMERGROUP

I have a question about the method of RocektMQ ConsumeMessageOrderlyService.sendMessageBack, The method comment:max reconsume times exceeded then send to dead letter queue。 But the message was actually sent to %RETYE%ConsumerGroup, this means that the message will then be consumed by the same group of consumers,and I have tried, the message was indeed sent first to SCHEDULE_TOPIC_XXXX, then delivered to %RETRY%ConsumerGroup, and never sent to Dead-Letter Queue.

this is the source code of ConsumeMessageOrderlyService.sendMessageBack:

    public boolean sendMessageBack(final MessageExt msg) {
    try {
        // max reconsume times exceeded then send to dead letter queue.
        Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
        MessageAccessor.setProperties(newMsg, msg.getProperties());
        String originMsgId = MessageAccessor.getOriginMessageId(msg);
        MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
        newMsg.setFlag(msg.getFlag());
        MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
        MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
        MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
        MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
        newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

        this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
        return true;
    } catch (Exception e) {
        log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
    }

    return false;
}

I am confused about it.

Upvotes: -2

Views: 75

Answers (1)

ComeOnWang
ComeOnWang

Reputation: 1

In rocketMQ 5.0.0 is a bug. I have submitted this bug. https://github.com/apache/rocketmq/issues/6163

Upvotes: 0

Related Questions