Reputation: 661
I'm trying to get a Camel route JMS->HTTP4 with Transaction working but the message is not transferred to ActiveMQ.DLQ when an Exception and I can't see why.
The example below illustrates what could happen if the server of the REST service is down and route cannot be delivered.
I get the correct Exception :
2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST
2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST]
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …
But the message is consumed and removed from queue. My assumption was that using transaction/transacted Camel and AMQ would resolve this and move the message to ActiveMQ.DLQ.
I have read chapter 9 of Camel in Action 1st Ed. and googled but haven't found any solution to my problem.
I know I can create/define my own TransactionErrorHandler() and store messages in a queue of my choice but I was under the impression that this was default when using transacted…
I'm using a standalone ActiveMQ 5.15.2 vanilla installed and config.
Camel 2.20.1
Java 8_144 on MacOS 10.13.2
My config:
@Configuration
public class Config {
/**
* The Camel context.
*/
final CamelContext camelContext;
/**
* The Broker url.
*/
@Value("${jms.broker.url}")
private String brokerURL;
/**
* Instantiates a new Config.
*
* @param camelContext the sisyfos context
* @param metricRegistry the metric registry
*/
@Autowired
public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
this.camelContext = camelContext;
this.metricRegistry = metricRegistry;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
/**
* Pooled connection factory pooled connection factory.
*
* @return the pooled connection factory
*/
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
return pooledConnectionFactory;
}
/**
* Jms configuration jms configuration.
*
* @return the jms configuration
*/
@Bean
public JmsConfiguration jmsConfiguration() {
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
jmsConfiguration.setTransacted(true);
jmsConfiguration.setTransactionManager(transactionManager());
jmsConfiguration.setConcurrentConsumers(10);
return jmsConfiguration;
}
/**
* Transaction manager jms transaction manager.
*
* @return the jms transaction manager
*/
@Bean
public JmsTransactionManager transactionManager() {
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(pooledConnectionFactory());
return transactionManager;
}
/**
* Active mq component active mq component.
*
* @return the active mq component
*/
@Bean
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
PooledConnectionFactory pooledConnectionFactory,
JmsTransactionManager transactionManager) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConfiguration(jmsConfiguration);
activeMQComponent.setTransacted(true);
activeMQComponent.setUsePooledConnection(true);
activeMQComponent.setConnectionFactory(pooledConnectionFactory);
activeMQComponent.setTransactionManager(transactionManager);
return activeMQComponent;
}
}
My Route:
@Component
public class SendToCore extends SpringRouteBuilder {
@Override
public void configure() throws Exception {
Logger.getLogger(SendToCore.class).info("Sending to CORE");
//No retries if first fails due to connection error
interceptSendToEndpoint("http4:*")
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
.throwException(new ConnectException("Cannot connect to CORE REST"))
.end();
from("activemq:queue:myIncomingQueue")
.transacted()
.setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
.to("http4:localhost/myRESTservice")
.log("${header.CamelHttpResponseCode}")
.end();
}
}
You might find redundant declarations in some of the beans and that is me trying to resolve the issue…
Adding a link to a Github repo of mine with a small test project illustrating this:
https://github.com/hakuseki/transacted
Upvotes: 0
Views: 3516
Reputation: 7025
This is probably a problem of SpringBoot autoconfiguration.
If the messages get lost instead of going to the DLQ the ActiveMQ component of Camel does autocommit them instead of waiting until the work is done.
Notice: my config does not have a transaction manager because it is not needed for your case. Instead just set in the
ActiveMQComponent
transacted
totrue
andlazyCreateTransactionManager
tofalse
. Then you got a "local" transaction with your broker and that is all you need.
.transacted()
from your route (needs a transaction manager, but is not needed to have a "JMS local-transacted" route)MainApplication
: @SpringBootApplication(exclude = { JmsAutoConfiguration.class, ActiveMQAutoConfiguration.class})
Java configuration:
@Value("${jms.broker.url}")
String brokerURL;
@Bean
public ActiveMQConnectionFactory connectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory cf) {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(1);
pooledConnectionFactory.setConnectionFactory(cf);
return pooledConnectionFactory;
}
@Bean(name = "activemq")
@ConditionalOnClass(ActiveMQComponent.class)
public ActiveMQComponent activeMQComponent(ConnectionFactory connectionFactory) {
ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConnectionFactory(connectionFactory);
activeMQComponent.setTransacted(true);
activeMQComponent.setLazyCreateTransactionManager(false);
return activeMQComponent;
}
Finally, just to "run" the route, I added a small Camel Route test
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest(classes = MainApplication.class)
public class SampleCamelApplicationTest {
@Produce(uri = "activemq:queue:myIncomingQueue")
protected ProducerTemplate template;
@Test
public void shouldProduceMessages() throws Exception {
template.sendBody("test");
Thread.sleep(20000); //wait for ActiveMQ redeliveries
}
}
If I run this test, the message is going to ActiveMQ.DLQ
.
Hope this helps
Upvotes: 3
Reputation: 55550
Just noticed that if you want Spring Boot to handle the lifecycle of those pool and configuration then you should not call their method directly, but let them be provided as parameters in the method signature
eg this
public ActiveMQComponent activeMQComponent() {
Should be
public ActiveMQComponent activeMQComponent(JmsConfiguration config, ConnectionFactory cf, ...) {
Then Spring Boot will provides these beans to you.
About why your transaction not works, then you can look at some of the transaction examples from the Camel in Action 2nd edition book: https://github.com/camelinaction/camelinaction2/tree/master/chapter12
Upvotes: 0