Rodrigo Sasaki
Rodrigo Sasaki

Reputation: 7226

How do I aggregate file content correctly with Apache Camel?

I am writing a tool to parse some very big files, and I am implementing it using Camel. I have used Camel for other things before and it has served me well.

I am doing an initial Proof of Concept on processing files in streaming mode, because if I try to run a file that is too big without it, I get a java.lang.OutOfMemoryError.

Here is my route configuration:

@Override
public void configure() throws Exception {
    from("file:" + from)
            .split(body().tokenize("\n")).streaming()
            .bean(new LineProcessor())
            .aggregate(header(Exchange.FILE_NAME_ONLY), new SimpleStringAggregator())
            .completionTimeout(150000)
            .to("file://" + to)
            .end();
}

from points to the directory where my test file is.

to points to the directory where I want the file to go after processing.

With that approach I could parse files that had up to hundreds of thousands of lines, so it's good enough for what I need. But I'm not sure the file is being aggregated correctly.

If i run cat /path_to_input/file I get this:

Line 1
Line 2
Line 3
Line 4
Line 5

Now on the output directory cat /path_to_output/file I get this:

Line 1
Line 2
Line 3
Line 4
Line 5%

I think this might be a pretty simple thing, although I don't know how to solve this. both files have slightly different byte sizes as well.

Here is my LineProcessor class:

public class LineProcessor implements Processor {

    @Override
    public void process(Exchange exchange) throws Exception {
        String line = exchange.getIn().getBody(String.class);
        System.out.println(line);
    }

}

And my SimpleStringAggregator class:

public class SimpleStringAggregator implements AggregationStrategy {

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {

        if(oldExchange == null) {
            return newExchange;
        }

        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        String body = oldBody + "\n" + newBody;

        oldExchange.getIn().setBody(body);

        return oldExchange;
    }

}

Maybe I shouldn't even worry about this, but I would just like to have it working perfectly since this is just a POC before I get to the real implementation.

Upvotes: 0

Views: 1800

Answers (2)

Namphibian
Namphibian

Reputation: 12221

The answer from 0X00me is probably correct however you are doing unneeded work probably.

I assume you are using a version of camel higher than 2.3. In which case you can drop the aggregation implementation completely as according to the camel documentation:

Camel 2.3 and newer:

The Splitter will by default return the original input message.

Change your route to something like this(I cant test it):

@Override
public void configure() throws Exception {
from("file:" + from)
        .split(body().tokenize("\n")).streaming()
        .bean(new LineProcessor())
        .completionTimeout(150000)
        .to("file://" + to)
        .end();
}

If you need to do custom aggregation then you need to implement the aggregator. I process files this way daily and always end with exactly what I started with.

Upvotes: 1

0x0me
0x0me

Reputation: 754

It looks like your input files last character is a line break. You split up the file with \n and add it back in the aggregator except for the last line. Because there is no new line left the line terminator \n is removed from the last line. One solution might by adding the \n in advance:

String body = oldBody + "\n" + newBody + "\n";

Upvotes: 3

Related Questions