Reputation: 23
I have a problem with Camel's transaction management.
In my example I extract an XML file out of a ZIP, create an JPA entity out of the xml via JAXB and write it to the database. Then I force a RuntimeException.
My expectation is that the inserted entities are rollbacked but they are already committed.
I put a transacted call over the ZIP splitter so that all containing files are processed or none. The aggregator is responsible to assemble the metadata from different files before writing it to the database.
Could somebody explain what is missing within the code or where my misunderstanding of Camel's transaction management lies?
Thank you in advance
Adrian
...
private final class AggregationStrategyImplementation implements AggregationStrategy {
DocumentMetaDataContainer documentMetaDataContainer;
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
documentMetaDataContainer = (DocumentMetaDataContainer) newExchange.getIn()
.getBody();
oldExchange = newExchange;
} else {
String header = String.valueOf(newExchange.getIn().getHeader("CamelFileName"));
System.out.println("aggregating " + header);
documentMetaDataContainer.putFileStreamToSpecificElement(header, newExchange
.getIn().getBody(byte[].class));
if (isDone()) {
oldExchange.getOut().setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
}
}
return oldExchange;
}
public boolean isDone() {
if (documentMetaDataContainer == null)
return false;
for (DocumentMetaData documentMetaData : documentMetaDataContainer
.getListOfDocumentMetaData()) {
if (documentMetaData.getDocumentFile() == null)
return false;
}
return true;
}
}
public void processImport() throws Exception {
ApplicationContext springContext = new ClassPathXmlApplicationContext(
"classpath:spring.xml");
final CamelContext camelContext = SpringCamelContext.springCamelContext(springContext);
RouteBuilder routeBuilder = new RouteBuilder() {
private static final String inbox = "/Development/ppaFiles";
JAXBContext jaxbContext = JAXBContext.newInstance(new Class[] {
com.business.services.DocumentMetaDataContainer.class
});
DataFormat jaxbDataFormat = new JaxbDataFormat(jaxbContext);
public void configure() {
from("file:"+inbox+"?consumer.delay=1000&noop=true")
.routeId("scanDirectory")
.choice()
.when(header("CamelFileName").regex("badb_(.)*.zip"))
.setHeader("msgId").simple("${header.CamelFileName}_${date:now:S}")
.log("processing zip file, aggregating by ${header.msgId}")
.to("direct:splitZip")
.end();
from("direct:splitZip")
.routeId("splitZip")
.transacted()
.split(new ZipSplitter())
.streaming()
.choice()
.when(header("CamelFileName").regex("(.)*_files.xml")) // Meta File
.to("direct:toDocumentMetaData")
.otherwise() // PDF XML Files
.to("direct:toByteArray")
.end();
from("direct:toByteArray")
.routeId("toByteArray")
.convertBodyTo(byte[].class)
.to("direct:aggregateZipEntries");
from("direct:toDocumentMetaData")
.routeId("toDocumentMetaData")
.split()
// root tag name in xml file
.tokenizeXML("files")
.unmarshal(jaxbDataFormat)
.to("direct:aggregateZipEntries");
from("direct:aggregateZipEntries")
.routeId("aggregateZipEntries")
// force to start with meta data file ('..._files.xml')
.resequence(simple("${header.CamelFileName.endsWith('_files.xml')}"))
.allowDuplicates()
.reverse()
.aggregate(new AggregationStrategyImplementation())
.header("msgId")
.completionTimeout(2000L)
.multicast()
.to("direct:saveDocumentMetaData", "direct:doErrorProcessing");
from("direct:saveDocumentMetaData")
.routeId("saveDocumentMetaData")
.split(simple("${body.listOfDocumentMetaData}"))
.multicast()
.to("jpa://com.business.persistence.entities.DocumentMetaData"+
"?persistenceUnit=persistenceUnit"+
"&consumer.transacted=true"+
"&transactionManager=#transactionManager"+
"&flushOnSend=false")
.log("processDocumentMetaData: ${body.getName()}");
from("direct:doErrorProcessing")
.routeId("doErrorProcessing")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
throw new RuntimeException("Error");
}
});
}
};
camelContext.addRoutes(routeBuilder);
camelContext.start();
Thread.sleep(10000);
camelContext.stop();
}
...
Upvotes: 2
Views: 1587
Reputation: 897
It seems that the transaction that is created in the "splitZip" route doesn't extend until the save events in "saveDocumentMetaData" and "doErrorProcessing", perhaps due to using an aggregator without a persistent store. That is why the exception thrown in "doErrorProcessing" doesn't lead to a rollback in "saveDocumentMetaData".
To enclose "saveDocumentMetaData" and "doErrorProcessing" in one transaction create a new transaction for the multicast:
// ...
.aggregate(new AggregationStrategyImplementation())
.header("msgId")
.completionTimeout(2000L)
.to("direct:persist");
// new transacted route
from("direct:persist")
.routeId("persist")
.transacted()
.multicast()
.to("direct:saveDocumentMetaData", "direct:importBalanceSheet");
Upvotes: 4