user3447812
user3447812

Reputation: 31

Apache camel : Multithreading in route

String processFiles= "file://somedirectory?readLock=rename&preMove=inprogress/&move=../processed/&moveFailed=../error/"

String postProcessor = "file://somedirectory/inprogress";

from(processFiles) .threads(10) .routeId("someId")
.to("bean:somebean");

from(postProcessor) .routeId("postProcress") .to("bean:postProcessorBean");

Reading files from certain location using multiple threads. Then processing is done in "somebean".

Now, i want to do some post processing once all the threads completes first route. i dont understand when exactly to call postprocessor. The way i have done above is giving incorrect results. PostProcessor is getting called before "Somebean" is completed by all the threads. I want to know how do i call postProcess once all the threads finish "somebean"

Just to give a problem statement : 1. We have number of files each with millions of records at one location.We need to read those files and save data in database. 2. Once done update status in other table.

Solution is already in place. But currently its taking more time. Hence we tried to use thread at camel route level so multiple files can be processed simultaneously. Now with this we are able to minimize the time but not able to do postprocessing (i.e step 2 )

Upvotes: 3

Views: 15978

Answers (2)

Peter Keller
Peter Keller

Reputation: 7646

Use the aggregator pattern:

from("file:src/main/resources/data/parallel-file-processing?noop=true")
    .threads(10)
    .process(new PreProcessor())
    .aggregate(constant(true), new ArrayListAggregationStrategy())
    .completionFromBatchConsumer()
    .process(new PostProcessor());

PreProcessor implements the Processor interface doing the time consuming staff in 10 separate threads.

The results of the pre-processing are aggregated with aggregate using ArrayListAggregatorStrategy taken from the Camel aggregator doc:

//simply combines Exchange body values into an ArrayList<Object>
public class ArrayListAggregationStrategy implements AggregationStrategy {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Object newBody = newExchange.getIn().getBody();
        List<Object> list = null;
        if (oldExchange == null) {
            list = new ArrayList<Object>();
            list.add(newBody);
            newExchange.getIn().setBody(list);
            return newExchange;
        } else {
            list = oldExchange.getIn().getBody(List.class);
            list.add(newBody);
            return oldExchange;
        }
    }
}

PostProcessor (or a bean) receives a List that can be post-processed:

private static class PostProcessor implements Processor {
    @SuppressWarnings("unchecked")
    @Override
    public void process(final Exchange exchange) throws Exception {
        final Object body = exchange.getIn().getBody();
        final List<GenericFile<File>> list = (List<GenericFile<File>>) body;
        for (final GenericFile<File> genericFile : list) {
            LOG.info("file = " + genericFile.getAbsoluteFilePath());
        }
    }
}

Finally, the post processing also done in parallel:

from("file:src/main/resources/data/parallel-file-processing?noop=true")
    .threads(10)
    .process(new PreProcessor())
    .aggregate(constant(true), new ArrayListAggregationStrategy())
    .completionFromBatchConsumer()
    .parallelProcessing()  // <- for parallel processing
    .process(new PostProcessor());

Upvotes: 4

Ben ODay
Ben ODay

Reputation: 21015

use an aggregator to rejoin after the some configurable number of messages or timeout

Upvotes: 1

Related Questions