Reputation: 143
I use spring amqp in my project, and I use implements ChannelAwareMessageListener for re-send and handle exception to make rabbit listener more stable:
public abstract class AbstractMessageListener implements ChannelAwareMessageListener {
@Autowired
private Jackson2JsonMessageConverter messageConverter;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
/** where comsumer really do biz */
public abstract void receiveMessage(Message message, MessageConverter messageConverter);
@Override
public void onMessage(Message message, Channel channel) throws Exception {
MessageProperties messageProperties = message.getMessageProperties();
Long deliveryTag = messageProperties.getDeliveryTag();
Long consumerCount = redisTemplate.opsForHash().increment(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
messageProperties.getMessageId(), 1);
try {
receiveMessage(message, messageConverter);
channel.basicAck(deliveryTag, false);
redisTemplate.opsForHash().delete(MQConstants.MQ_CONSUMER_RETRY_COUNT_KEY,
messageProperties.getMessageId());
} catch (Exception e) {
if (consumerCount >= MQConstants.MAX_CONSUMER_COUNT) {
channel.basicReject(deliveryTag, false);
} else {
Thread.sleep((long) (Math.pow(MQConstants.BASE_NUM, consumerCount)*1000));
channel.basicNack(deliveryTag, false, true);
}
}
}
then we can receive by extend our AbstractMessageListener like that:
public class BizMessageListener extends AbstractMessageListener {
Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void receiveMessage(Message message, MessageConverter messageConverter) {
//do our own biz
}
}
but one day my boss said this way is too Invasion you mush use annotation instead, so I found something like that: Spring RabbitMQ - using manual channel acknowledgement on a service with @RabbitListener configuration
where I can use annotation as
@RabbitListener(queues = "so38728668")
public void receive(String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag)
throws IOException {
but how can I encapsulate @RabbitListener to a high level to combine my own re-send msg code in my first code sample , for example there is a annotation as RabbitResenderListener
@RabbitResenderListener(queues = "so38728668")
public void receive(Message msg)
throws IOException {
// just do biz
}
this annotation give the method re-send msg and error-handle ability, so that the method only do biz. thks
Upvotes: 0
Views: 1489
Reputation: 121382
Not sure what you expect from us, but seems for me annotations are not for extensions and extracting abstractions like we can do with classes.
Anyway I can share some idea you may try.
There is a method level @RabbitHandler
annotation. So, you can mark a method in super class with that and do all the infrastructure logic. The target implementation should be marked with the particular @RabbitListener
on the class to bring the required queues
configuration. The @RabbitHandler
method will call the polymorphic method from the inheritor.
Does it sound reasonable?
Upvotes: 0