Ole-M
Ole-M

Reputation: 821

Wait for all files to be consumed before triggering next route

I have route consuming a file location (recursive) and a route processing files at another file location. Sometimes the first route might find multiple files.

from(fileLocation)
 .autoStartup(true)
 .routeId("file-scanner")
 .to(newFileLocation)
 .to("direct:processFile")

from("direct:processFile")
 .routeId("file-processor")
 .bean(*doing some stuff with new file location*)

So what ends up being the case sometimes is that file-scanner copies one file, file-processor process the file, and then file-scanner copies one more file and then it runs again.

What I essentially want is that file-scanner copies all the files before file-processor starts to process the files. In that way, I can run the processing only once.

The fileLocation I consume from is defined with a config looking like this:

recursive=true&noop=true&idempotent=false&delay=3600000&include=.*.json&autoCreate=false&startingDirectoryMustExist=true&readLock=none

Upvotes: 0

Views: 1428

Answers (1)

Alexey Yakunin
Alexey Yakunin

Reputation: 1771

All decisions revolve around batch consumer exchange properties. I think, you can implement two very different solution:

  • Solution, based on aggregator integration pattern. You need to aggregate all files from batch into, maybe, list of strings, because your files contains JSON. Aggregator option "completionFromBatchConsumer" can help you to aggregate all files consumed from a File endpoint in that given poll. After aggregation you can process aggregated files all together. Maybe you can develop custom aggregation strategy that will implement the logic of your bean, marked "doing some stuff with new file location".

  • Trigger, based on controlbus integration pattern:


    from(fileLocation) 
    .autoStartup(true)  
    .routeId("file-scanner")  
    .to(newFileLocation)
    .choice().when(exchangeProperty("CamelBatchComplete"))
    .process(new Processor() {
       @Override
       public void process(Exchange exchange) throws Exception {
           exchange.getContext().startRoute("file-processor");
       }
    })
    .end();

    from(newFileLocation).  
    .routeId("file-processor") 
    .autoStartup(false)   
    .bean(*doing some stuff with new file location*)
    .choice().when(exchangeProperty("CamelBatchComplete"))
    .process(new Processor() {
       @Override
       public void process(Exchange exchange) throws Exception {
           exchange.getContext().stopRoute("file-processor");
       }
    })
    .end();

Upvotes: 1

Related Questions