Reputation: 821
I have route consuming a file location (recursive) and a route processing files at another file location. Sometimes the first route might find multiple files.
from(fileLocation)
.autoStartup(true)
.routeId("file-scanner")
.to(newFileLocation)
.to("direct:processFile")
from("direct:processFile")
.routeId("file-processor")
.bean(*doing some stuff with new file location*)
So what ends up being the case sometimes is that file-scanner
copies one file, file-processor
process the file, and then file-scanner
copies one more file and then it runs again.
What I essentially want is that file-scanner
copies all the files before file-processor
starts to process the files. In that way, I can run the processing only once.
The fileLocation
I consume from is defined with a config looking like this:
recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none
Upvotes: 0
Views: 1428
Reputation: 1771
All decisions revolve around batch consumer exchange properties. I think, you can implement two very different solution:
Solution, based on aggregator integration pattern. You need to aggregate all files from batch into, maybe, list of strings, because your files contains JSON. Aggregator option "completionFromBatchConsumer" can help you to aggregate all files consumed from a File endpoint in that given poll. After aggregation you can process aggregated files all together. Maybe you can develop custom aggregation strategy that will implement the logic of your bean, marked "doing some stuff with new file location".
Trigger, based on controlbus integration pattern:
from(fileLocation) .autoStartup(true) .routeId("file-scanner") .to(newFileLocation) .choice().when(exchangeProperty("CamelBatchComplete")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getContext().startRoute("file-processor"); } }) .end(); from(newFileLocation). .routeId("file-processor") .autoStartup(false) .bean(*doing some stuff with new file location*) .choice().when(exchangeProperty("CamelBatchComplete")) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getContext().stopRoute("file-processor"); } }) .end();
Upvotes: 1