Reputation: 402
Scenario: I am trying to stream and process some large xml files. These files are send from a producer asynchronously.
producerTemplate.sendBodyAndHeaders(endpointUri, inStream, ImmutableMap.of(JOBID_PROPERTY, importJob.getId()));
I need to batch all file input streams, identify the files by probing them with xpath and reorder them according to their content. I have the following route:
from("direct:route1")
.streamCaching()
.choice()
.when(xpath("//Tag1")) .setHeader("execOrder", constant(3)) .setHeader("xmlRoute", constant( "direct:some-route"))
.when(xpath("//Tag2")) .setHeader("execOrder", constant(1)) .setHeader("xmlRoute", constant( "direct:some-other-route"))
.when(xpath("//Tag3")) .setHeader("execOrder", constant(2)) .setHeader("xmlRoute", constant( "direct:yet-another-route"))
.otherwise()
.to("direct:somewhereelse")
.end()
.resequence(header("execOrder"))
.batch(new BatchResequencerConfig(300, 10000L))
.allowDuplicates()
.recipientList(header("xmlRoute"))
When running my code I get the following error:
2017-11-23 11:43:13.442 INFO 10267 --- [ - Batch Sender] c.w.n.s.m.DefaultImportJobService : Updating entity ImportJob with id 5a16a61803af33281b22c716
2017-11-23 11:43:13.451 WARN 10267 --- [ - Batch Sender] org.apache.camel.processor.Resequencer : Error processing aggregated exchange: Exchange[ID-int-0-142-bcd-wsint-pro-59594-1511433568520-0-20]. Caused by: [org.apache.camel.RuntimeCamelException - Cannot reset stream from file /var/folders/dc/fkrgdrnx6txbg7jfdjd_58mm0000gn/T/camel/camel-tmp-39abaae8-9bdd-435a-b63d-299ad8b06415/cos1499080503439465502.tmp]
org.apache.camel.RuntimeCamelException: Cannot reset stream from file /var/folders/dc/fkrgdrnx6txbg7jfdjd_58mm0000gn/T/camel/camel-tmp-39abaae8-9bdd-435a-b63d-299ad8b06415/cos1499080503439465502.tmp
at org.apache.camel.converter.stream.FileInputStreamCache.reset(FileInputStreamCache.java:91)
I've read here that the FileInputStreamCache is closed when the XPathBuilder.getDocument() is called, and the temp file is deleted, so you get the FileNotFoundException when the XPathBuilder wants to reset the InputStream
The solution seems to be to disable the spooling to disk like that:
camelContext.getStreamCachingStrategy().setSpoolThreshold(-1);
However, I don't want to do that because of RAM restrictions, i.e. files can get up to 600MB and I don't want to keep them in memory. Any ideas how to solve the problem?
Upvotes: 2
Views: 1650
Reputation: 402
I ended up doing what Claus and Ricardo suggested. I made a separate route which saves the files to disk. Then another one which probes the files and resequences the exchanges according to a fixed order.
String xmlUploadDirectory = "file://" + Files.createTempDir().path + "/xmls?noop=true"
from("direct:route1")
.to(xmlUploadDirectory)
from(xmlUploadDirectory)
.choice()
.when(xpath("//Tag1")).setHeader("execOrder", constant(3)).setHeader("xmlRoute", constant( "direct:some-route"))
.when(xpath("//Tag2")).setHeader("execOrder", constant(1)).setHeader("xmlRoute", constant( "direct:some-other-route"))
.when(xpath("//Tag3")).setHeader("execOrder", constant(2)).setHeader("xmlRoute", constant( "direct:yet-another-route"))
.otherwise()
.to("direct:somewhereelse")
.end()
.to("direct:resequencing")
from("direct:resequencing")
.resequence(header("execOrder"))
.batch(new BatchResequencerConfig(300, 10000L))
.allowDuplicates()
.recipientList(header("xmlRoute"))
Upvotes: 1
Reputation: 55525
The resequencer is a two-leg pattern (stateful) and will cause the original exchange to be done beforehand, as its keeping a copy in memory while re-sequencing until the gap is fulfilled and sending the messages out in the new order.
Since your input stream comes from some HTTP service then that would be closed beforehand the resequencer may output the exchange.
Either do as suggested to store to local disk first, and then let the resequencer work on that, or find a way not to use the resequencer.
Upvotes: 2