Sumit
Sumit

Reputation: 55

Apache Camel: How to use "done" files to identify records written into a file is over and it can be moved

As the title suggests, I want to move a file into a different folder after I am done writing DB records to to it. I have already looked into several questions related to this: Apache camel file with doneFileName

But my problem is a little different since I am using split, stream and parallelProcessing for getting the DB records and writing to a file. I am not able to know when and how to create the done file along with the parallelProcessing. Here is the code snippet:

My route to fetch records and write it to a file:

from(<ROUTE_FETCH_RECORDS_AND_WRITE>)
        .setHeader(Exchange.FILE_PATH, constant("<path to temp folder>"))
        .setHeader(Exchange.FILE_NAME, constant("<filename>.txt"))
        .setBody(constant("<sql to fetch records>&outputType=StreamList))
        .to("jdbc:<endpoint>)
        .split(body(), <aggregation>).streaming().parallelProcessing()
            .<some processors>
            .aggregate(header(Exchange.FILE_NAME), (o, n) -> {
                <file aggregation>
                return o;
            }).completionInterval(<some time interval>)
                .toD("file://<to the temp file>")
            .end()
        .end()
        .to("file:"+<path to temp folder>+"?doneFileName=${file:header."+Exchange.FILE_NAME+"}.done"); //this line is just for trying out done filename 

In my aggregation strategy for the splitter I have code that basically counts records processed and prepares the response that would be sent back to the caller. And in my other aggregate outside I have code for aggregating the db rows and post that writing into the file.

And here is the file listener for moving the file:

from("file://<path to temp folder>?delete=true&include=<filename>.*.TXT&doneFileName=done")
.to(file://<final filename with path>?fileExist=Append);

Doing something like this is giving me this error:

     Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot store file: <folder-path>/filename.TXT] org.apache.camel.component.file.GenericFileOperationFailedException: Cannot store file: <folder-path>/filename.TXT
    at org.apache.camel.component.file.FileOperations.storeFile(FileOperations.java:292)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.component.file.GenericFileProducer.writeFile(GenericFileProducer.java:277)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.component.file.GenericFileProducer.processExchange(GenericFileProducer.java:165)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.component.file.GenericFileProducer.process(GenericFileProducer.java:79)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:141)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:460)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:121)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:190)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:298)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:207)[209:org.apache.camel.camel-core:2.16.2]
    at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:154)[209:org.apache.camel.camel-core:2.16.2]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)[:1.8.0_144]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)[:1.8.0_144]
    at java.lang.Thread.run(Thread.java:748)[:1.8.0_144]
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.io.InputStream but has value: Total number of records discovered: 5

What am I doing wrong? Any inputs will help.

PS: Newly introduced to Apache Camel

Upvotes: 0

Views: 1937

Answers (1)

burki
burki

Reputation: 7005

I would guess that the error comes from .toD("file://<to the temp file>") trying to write a file, but finds the wrong type of body (String Total number of records discovered: 5 instead of InputStream.

I don't understand why you have one file-destinations inside the splitter and one outside of it.

As @claus-ibsen suggested try to remove this extra .aggregate(...) in your route. To split and re-aggregate it is sufficient to reference the aggregation strategy in the splitter. Claus also pointed to an example in the Camel docs

from(<ROUTE_FETCH_RECORDS_AND_WRITE>)
    .setHeader(Exchange.FILE_PATH, constant("<path to temp folder>"))
    .setHeader(Exchange.FILE_NAME, constant("<filename>.txt"))
    .setBody(constant("<sql to fetch records>&outputType=StreamList))
    .to("jdbc:<endpoint>)
    .split(body(), <aggregationStrategy>)
        .streaming().parallelProcessing()
        // the processors below get individual parts 
        .<some processors>
    .end()
    // The end statement above ends split-and-aggregate. From here 
    // you get the re-aggregated result of the splitter.
    // So you can simply write it to a file and also write the done-file
    .to(...);

However, if you need to control the aggregation sizes, you have to combine splitter and aggregator. That would look somehow like this

from(<ROUTE_FETCH_RECORDS_AND_WRITE>)
    .setHeader(Exchange.FILE_PATH, constant("<path to temp folder>"))
    .setHeader(Exchange.FILE_NAME, constant("<filename>.txt"))
    .setBody(constant("<sql to fetch records>&outputType=StreamList))
    .to("jdbc:<endpoint>)
    // No aggregationStrategy here so it is a standard splitter
    .split(body())
        .streaming().parallelProcessing()
        // the processors below get individual parts 
        .<some processors>
    .end()
    // The end statement above ends split. From here 
    // you still got individual records from the splitter.
    .to(seda:aggregate);

// new route to do the controlled aggregation
from("seda:aggregate")
    // constant(true) is the correlation predicate => collect all messages in 1 aggregation
    .aggregate(constant(true), new YourAggregationStrategy())
        .completionSize(500)
    // not sure if this 'end' is needed
    .end()
    // write files with 500 aggregated records here
    .to("...");

Upvotes: 2

Related Questions