Reputation: 1
I have a question about the method of RocektMQ ConsumeMessageOrderlyService.sendMessageBack, The method comment:max reconsume times exceeded then send to dead letter queue。 But the message was actually sent to %RETYE%ConsumerGroup, this means that the message will then be consumed by the same group of consumers,and I have tried, the message was indeed sent first to SCHEDULE_TOPIC_XXXX, then delivered to %RETRY%ConsumerGroup, and never sent to Dead-Letter Queue.
this is the source code of ConsumeMessageOrderlyService.sendMessageBack:
public boolean sendMessageBack(final MessageExt msg) {
try {
// max reconsume times exceeded then send to dead letter queue.
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
MessageAccessor.setProperties(newMsg, msg.getProperties());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
I am confused about it.
Upvotes: -2
Views: 75
Reputation: 1
In rocketMQ 5.0.0 is a bug. I have submitted this bug. https://github.com/apache/rocketmq/issues/6163
Upvotes: 0