Reputation: 2110
I've got a situation on my hands where I need to read from a directory and process all XML files there. The files are then processed and split according to some rule which results in N new messages/files. I would like to aggregate all the new documents into an index file with one line per file. Once the batch has been processed I will call a webservice and tell it to fetch the index file.
The problem I'm facing is that Camel doesn't see the delivery of the files as one job and my aggregation gets called multiple times, resulting in multiple index files instead of one. I have tried using the Exchange.BATCH_SIZE
property and multiplying it by Exchange.SPLIT_SIZE
for the aggregration completionSize
Pseudo-Code:
from("file://" + SOURCE_FOLDER)
.threads(10)
.convertBodyTo(Data.class)
.process(myProcessor)
.split(xpath(MAIN_NODE))
.parallelProcessing()
.to(MyRouter.ENDPOINT)
.setProperty(TOTAL)
.spel(String.format("%s * %s", Exchange.SPLIT_SIZE, FileRouter.NUM_FILES))
.aggregate(constant(true), myAggregator)
....
So the question is: How can I define the boundaries of the exchange / group it? I know that there will only be 1 file delivery at a time but how can I tell Camel that?
I have tried working with only 1 thread but it seems to me it has no effect on this matter.
I have the option of the delivery arriving as tar.gz - would that be easier? To me it seems I'd have the same issue.
from("file://" + SOURCE_FOLDER + "/tar.gz?consumer.delay=1000&noop=true")
.streamCaching()
.unmarshal()
.gzip()
.split(new TarSplitter())
....<SAME ISSUE>
I'm using the Camel 3 Preview though I have tried with latest 2.x version as well.
Thanks in advance and best,
Upvotes: 2
Views: 3254
Reputation: 7035
When I understand you correctly, your problem is that you are not able to create an aggregated index per input file.
Well, the Camel Splitter EIP can be combined with an aggregation strategy.
If you do this, the Splitter re-aggregates all parts of a previously splitted message to a new aggregate.
// a Splitter with an AggregationStrategy
.split([yourSplitCriteria], new MyAggregationStrategy())
// you can do whatever you want in here with the message parts
// for example each splitted message part is sent to this bean
.to("bean:PartProcessor")
// end the split to re-aggregate the message parts
.end()
// here, after the split-ending you get the re-aggregated messages
Upvotes: 3