Reputation: 31
String processFiles= "file://somedirectory?readLock=rename&preMove=inprogress/&move=../processed/&moveFailed=../error/"
String postProcessor = "file://somedirectory/inprogress";
from(processFiles) .threads(10) .routeId("someId")
.to("bean:somebean");from(postProcessor) .routeId("postProcress") .to("bean:postProcessorBean");
Reading files from certain location using multiple threads. Then processing is done in "somebean".
Now, i want to do some post processing once all the threads completes first route. i dont understand when exactly to call postprocessor. The way i have done above is giving incorrect results. PostProcessor is getting called before "Somebean" is completed by all the threads. I want to know how do i call postProcess once all the threads finish "somebean"
Just to give a problem statement : 1. We have number of files each with millions of records at one location.We need to read those files and save data in database. 2. Once done update status in other table.
Solution is already in place. But currently its taking more time. Hence we tried to use thread at camel route level so multiple files can be processed simultaneously. Now with this we are able to minimize the time but not able to do postprocessing (i.e step 2 )
Upvotes: 3
Views: 15978
Reputation: 7646
Use the aggregator pattern:
from("file:src/main/resources/data/parallel-file-processing?noop=true")
.threads(10)
.process(new PreProcessor())
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionFromBatchConsumer()
.process(new PostProcessor());
PreProcessor
implements the Processor
interface doing the time consuming staff in 10 separate threads.
The results of the pre-processing are aggregated with aggregate
using ArrayListAggregatorStrategy
taken from the Camel aggregator doc:
//simply combines Exchange body values into an ArrayList<Object>
public class ArrayListAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Object newBody = newExchange.getIn().getBody();
List<Object> list = null;
if (oldExchange == null) {
list = new ArrayList<Object>();
list.add(newBody);
newExchange.getIn().setBody(list);
return newExchange;
} else {
list = oldExchange.getIn().getBody(List.class);
list.add(newBody);
return oldExchange;
}
}
}
PostProcessor
(or a bean) receives a List
that can be post-processed:
private static class PostProcessor implements Processor {
@SuppressWarnings("unchecked")
@Override
public void process(final Exchange exchange) throws Exception {
final Object body = exchange.getIn().getBody();
final List<GenericFile<File>> list = (List<GenericFile<File>>) body;
for (final GenericFile<File> genericFile : list) {
LOG.info("file = " + genericFile.getAbsoluteFilePath());
}
}
}
Finally, the post processing also done in parallel:
from("file:src/main/resources/data/parallel-file-processing?noop=true")
.threads(10)
.process(new PreProcessor())
.aggregate(constant(true), new ArrayListAggregationStrategy())
.completionFromBatchConsumer()
.parallelProcessing() // <- for parallel processing
.process(new PostProcessor());
Upvotes: 4
Reputation: 21015
use an aggregator to rejoin after the some configurable number of messages or timeout
Upvotes: 1