Reputation: 37
I am trying to figure out the best way to handle errors that might have occurred in a service that is called after a aggregate's group timeout occurred that mimics the same flow as if the releaseExpression was met.
Here is my setup:
I have a AmqpInboundChannelAdapter that takes in messages and send them to my aggregator.
When the releaseExpression has been met and before the groupTimeout has expired, if an exception gets thrown in my ServiceActivator, the messages get sent to my dead letter queue for all the messages in that MessageGroup. (10 messages in my example below, which is only used for illustrative purposes) This is what I would expect.
If my releaseExpression hasn't been met but the groupTimeout has been met and the group times out, if an exception gets throw in my ServiceActivator, then the messages do not get sent to my dead letter queue and are acked.
After reading another blog post, link1 it mentions that this happens because the processing happens in another thread by the MessageGroupStoreReaper and not the one that the SimpleMessageListenerContainer was on. Once processing moves away from the SimpleMessageListener's thread, the messages will be auto ack.
I added the configuration mentioned in the link above and see the error messages getting sent to my error handler. My main question, is what is considered the best way to handle this scenario to minimize message getting lost.
Here are the options I was exploring:
Use a BatchRabbitTemplate in my custom error handler to publish the failed messaged to the same dead letter queue that they would have gone to if the releaseExpression was met. (This is the approach I outlined below but I am worried about messages getting lost, if an error happens during publishing)
Investigate if there is away I could let the SimpleMessageListener know about the error that occurred and have it send the batch of messages that failed to a dead letter queue? I doubt this is possible since it seems the messages are already acked.
Don't set the SimpleMessageListenerContainer to AcknowledgeMode.AUTO and manually ack the messages when they get processed via the Service when the releaseExpression being met or the groupTimeOut happening. (This seems kinda of messy, since there can be 1..N message in the MessageGroup but wanted to see what others have done)
Ideally, I want to have a flow that will that will mimic the same flow when the releaseExpression has been met, so that the messages don't get lost.
Does anyone have recommendation on the best way to handle this scenario they have used in the past?
Thanks for any help and/or advice!
Here is my current configuration using Spring Integration DSL
@Bean
public SimpleMessageListenerContainer workListenerContainer() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(rabbitConnectionFactory);
container.setQueues(worksQueue());
container.setConcurrentConsumers(4);
container.setDefaultRequeueRejected(false);
container.setTransactionManager(transactionManager);
container.setChannelTransacted(true);
container.setTxSize(10);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return container;
}
@Bean
public AmqpInboundChannelAdapter inboundRabbitMessages() {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(workListenerContainer());
return adapter;
}
I have defined a error channel and defined my own taskScheduler to use for the MessageStoreRepear
@Bean
public ThreadPoolTaskScheduler taskScheduler(){
ThreadPoolTaskScheduler ts = new ThreadPoolTaskScheduler();
MessagePublishingErrorHandler mpe = new MessagePublishingErrorHandler();
mpe.setDefaultErrorChannel(myErrorChannel());
ts.setErrorHandler(mpe);
return ts;
}
@Bean
public PollableChannel myErrorChannel() {
return new QueueChannel();
}
public IntegrationFlow aggregationFlow() {
return IntegrationFlows.from(inboundRabbitMessages())
.transform(Transformers.fromJson(SomeObject.class))
.aggregate(a->{
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
}
)
.handle("someService", "processMessages")
.get();
}
Here is my custom error flow
@Bean
public IntegrationFlow errorResponse() {
return IntegrationFlows.from("myErrorChannel")
.<MessagingException, Message<?>>transform(MessagingException::getFailedMessage,
e -> e.poller(p -> p.fixedDelay(100)))
.channel("myErrorChannelHandler")
.handle("myErrorHandler","handleFailedMessage")
.log()
.get();
}
Here is the custom error handler
@Component
public class MyErrorHandler {
@Autowired
BatchingRabbitTemplate batchingRabbitTemplate;
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(Message<?> message) {
ArrayList<SomeObject> payload = (ArrayList<SomeObject>)message.getPayload();
payload.forEach(m->batchingRabbitTemplate.convertAndSend("some.dlq","#", m));
}
}
Here is the BatchingRabbitTemplate bean
@Bean
public BatchingRabbitTemplate batchingRabbitTemplate() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(5);
scheduler.initialize();
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, Integer.MAX_VALUE, 30000);
BatchingRabbitTemplate batchingRabbitTemplate = new BatchingRabbitTemplate(batchingStrategy, scheduler);
batchingRabbitTemplate.setConnectionFactory(rabbitConnectionFactory);
return batchingRabbitTemplate;
}
Update 1) to show custom MessageGroupProcessor:
public class CustomAggregtingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected final Object aggregatePayloads(MessageGroup group, Map<String, Object> headers) {
return group;
}
}
Example Service:
@Slf4j
public class SomeService {
@ServiceActivator
public void processMessages(MessageGroup messageGroup) throws IOException {
Collection<Message<?>> messages = messageGroup.getMessages();
//Do business logic
//ack messages in the group
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug(" deliveryTag = {}",deliveryTag);
log.debug("Channel = {}",channel);
channel.basicAck(deliveryTag, false);
}
}
}
Updated integrationFlow
public IntegrationFlow aggregationFlowWithCustomMessageProcessor() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.releaseExpression("size() == 10");
a.transactional(true);
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
New ErrorHandler to do nack
public class MyErrorHandler {
@ServiceActivator(inputChannel = "myErrorChannelHandler")
public void handleFailedMessage(MessageGroup messageGroup) throws IOException {
if(messageGroup!=null) {
log.debug("Nack messages size = {}", messageGroup.getMessages().size());
Collection<Message<?>> messages = messageGroup.getMessages();
for (Message<?> m : messages) {
com.rabbitmq.client.Channel channel = (com.rabbitmq.client.Channel)
m.getHeaders().get("amqp_channel");
long deliveryTag = (long) m.getHeaders().get("amqp_deliveryTag");
log.debug("deliveryTag = {}",deliveryTag);
log.debug("channel = {}",channel);
channel.basicNack(deliveryTag, false, false);
}
}
}
}
Update 2 Added custom ReleaseStratgedy and change to aggegator
public class CustomMeasureGroupReleaseStratgedy implements ReleaseStrategy {
private static final int MAX_MESSAGE_COUNT = 10;
public boolean canRelease(MessageGroup messageGroup) {
return messageGroup.getMessages().size() >= MAX_MESSAGE_COUNT;
}
}
public IntegrationFlow aggregationFlowWithCustomMessageProcessorAndReleaseStratgedy() {
return IntegrationFlows.from(inboundRabbitMessages()).transform(Transformers.fromJson(SomeObject.class))
.aggregate(a -> {
a.sendPartialResultOnExpiry(true);
a.groupTimeout(3000);
a.expireGroupsUponCompletion(true);
a.expireGroupsUponTimeout(true);
a.correlationExpression("T(Thread).currentThread().id");
a.transactional(true);
a.releaseStrategy(new CustomMeasureGroupReleaseStratgedy());
a.outputProcessor(new CustomAggregtingMessageGroupProcessor());
}).handle("someService", "processMessages").get();
}
Upvotes: 2
Views: 2050
Reputation: 174554
There are some flaws in your understanding.If you use AUTO, only the last message will be dead-lettered when an exception occurs. Messages successfully deposited in the group, before the release, will be ack'd immediately.
The only way to achieve what you want is to use MANUAL acks.
There is no way to "tell the listener container to send messages to the DLQ". The container never sends messages to the DLQ, it rejects a message and the broker sends it to the DLX/DLQ.
Upvotes: 0