Reputation: 202
I´m using:
Basically, I have a SpringBoot application with an Apache Camel route which consumes messages from ActiveMQ with transactions. I need to set a RedeliveryPolicy on ActiveMQ, so when an error in processing happens, the message is retried a number of times.
I have made a configuration class with beans for ActiveMQ, the transactions work as intended but the RedeliveryPolicy does not work. Can anybody please help me understand what's wrong with this?
Here's the log output for a message that produces an error:
2018-10-23 10:35:28.005 DEBUG 10524 --- [mer[entryQueue]] o.a.c.s.spi.TransactionErrorHandler : Transaction begin (0x35d60381) redelivered(false) for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1)) 2018-10-23 10:35:28.020 DEBUG 10524 --- [mer[entryQueue]] o.apache.camel.processor.SendProcessor : >>>> direct://middle Exchange[ID-EPIC-LAP-25-1540312510586-0-1] 2018-10-23 10:35:28.375 DEBUG 10524 --- [mer[entryQueue]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1). On delivery attempt: 0 caught: java.lang.RuntimeException: ExceptionTest: Order Failed 2018-10-23 10:35:28.390 ERROR 10524 --- [mer[entryQueue]] o.a.camel.processor.DefaultErrorHandler : Failed delivery for (MessageId: ID:EPIC-LAP-25-50304-1540306817804-4:3:1:1:2 on ExchangeId: ID-EPIC-LAP-25-1540312510586-0-1). Exhausted after delivery attempt: 1 caught: java.lang.RuntimeException: ExceptionTest: Order Failed
Here's my config class for ActiveMQ:
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.activemq.RedeliveryPolicy
import org.apache.activemq.camel.component.ActiveMQComponent
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.jms.connection.JmsTransactionManager
import javax.jms.DeliveryMode
@Configuration
class ActiveMQConfiguration {
@Bean
ActiveMQConnectionFactory activeMQConnectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory()
activeMQConnectionFactory.brokerURL = 'tcp://localhost:61616'
activeMQConnectionFactory.userName = 'admin'
activeMQConnectionFactory.password = 'admin'
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy()
redeliveryPolicy.maximumRedeliveries = 3
redeliveryPolicy.redeliveryDelay = 150L
redeliveryPolicy.useExponentialBackOff = true
redeliveryPolicy.backOffMultiplier = 1.5
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy)
activeMQConnectionFactory
}
@Bean
ActiveMQComponent activeMQComponent(@Qualifier('activeMQConnectionFactory')ActiveMQConnectionFactory activeMQConnectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent()
activeMQComponent.connectionFactory = activeMQConnectionFactory
activeMQComponent.transacted = true
activeMQComponent.transactionManager = txManager()
activeMQComponent.cacheLevelName = 'CACHE_CONSUMER'
activeMQComponent.lazyCreateTransactionManager = false
activeMQComponent.deliveryMode = DeliveryMode.PERSISTENT
activeMQComponent
}
@Bean
JmsTransactionManager txManager(@Qualifier('activeMQConnectionFactory') ActiveMQConnectionFactory activeMQConnectionFactory) {
JmsTransactionManager txManager = new JmsTransactionManager()
txManager.connectionFactory = activeMQConnectionFactory
txManager.rollbackOnCommitFailure = true
txManager
}
}
Upvotes: 2
Views: 2572
Reputation: 7005
There are two issues here
1. You have two transaction managers
Due to the following two lines in your configuration of the Camel ActiveMQ component, you configure two transaction managers. That is a source of problems.
activeMQComponent.transacted = true // activates local JMS transactions
activeMQComponent.transactionManager = txManager() // additional tx manager
if you just want to consume transactional from ActiveMQ, you don't need to configure a Spring transaction manager.
These two lines of your config are enough to get local transactions with your ActiveMQ broker.
activeMQComponent.transacted = true
activeMQComponent.lazyCreateTransactionManager = false
So you should remove this line as well as the whole txManager
bean
activeMQComponent.transactionManager = txManager()
If you currently set the transacted flag in your Camel routes, you have to remove this too. And as I wrote, your routes consuming from ActiveMQ are still transacted even if you remove all this.
2. Redelivery not working
You have not published your Camel routes, but according to the error output, I assume that the broker does not redeliver because the error is handled by Camel.
It is the Camel error handler o.a.camel.processor.DefaultErrorHandler
that kicks in when the error occurs and because it handles the error, the message is commited against the broker and therefore no redelivery takes place.
Try to disable Camel error handling to see if the broker redelivers messages on errors.
errorHandler(noErrorHandler());
Upvotes: 1
Reputation: 631
Not very long ago I had problems with dlq queues - not all parameters set in the code worked. I had to add settings to acitvemq configs. Yes, it is not a good decision to divide configs but I didn't find another. Below is my config class for jms and an example queue configuration via activemq.xml:
@Configuration
@EnableJms
public class JmsConfig {
private Environment env;
@Autowired
public void setEnv(Environment env) {
this.env = env;
}
@Bean(name = "activemq")
public ActiveMQComponent activemq(@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager,
@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setTransactionManager(jmsTransactionManager);
activeMQComponent.setConnectionFactory(connectionFactory);
return activeMQComponent;
}
@Bean(name = "activemqJmsTemplate")
public JmsTemplate jmsTemplate(@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory);
return template;
}
@Bean(name = "activemqTransactionPolicy")
public SpringTransactionPolicy activemqTransactionPolicy(
@Qualifier("activemqTransactionManager") JmsTransactionManager jmsTransactionManager) {
SpringTransactionPolicy springTransactionPolicy = new SpringTransactionPolicy(jmsTransactionManager);
springTransactionPolicy.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return springTransactionPolicy;
}
@Bean(name = "activemqTransactionManager")
public JmsTransactionManager activemqTransactionManager(
@Qualifier("activemqConnectionFactory") ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
@Bean(name = "activemqConnectionFactory")
public ConnectionFactory connectionFactory(@Qualifier("activemqRedeliveryPolicy") RedeliveryPolicy redeliveryPolicy) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://" + env.getProperty("queue.url"));
connectionFactory.setTrustAllPackages(true);
RedeliveryPolicyMap map = connectionFactory.getRedeliveryPolicyMap();
map.put(new ActiveMQQueue("queueName.DLQ"), redeliveryPolicy);
return connectionFactory;
}
@Bean(name = "activemqRedeliveryPolicy")
public RedeliveryPolicy redeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(0);
return redeliveryPolicy;
}
}
Changes in activevq.xml:
<destinationPolicy>
<policyMap>
<policyEntries>
<!--set dead letter queue for our queue. It name will be "myQueueName.DLQ"-->
<policyEntry queue="myQueueName">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="" queueSuffix=".DLQ"/>
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">">
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!--Set the redelivery delay to one hour-->
<redeliveryPolicy queue="myQueueName.DLQ" maximumRedeliveries="-1" redeliveryDelay="3600000"/>
</redeliveryPolicyEntries>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
Upvotes: 1