Reputation: 131
I am trying to process about 7 million rows daily from an Informix table with Apache Camel but I can't figure out how it can be accomplished.
My first attempt that was working with a very low set of data (about 50k rows) was using .split(body()).parallelProcessing()
like so:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");
from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();
This of course resulted in an "OutOfMemory" error when I tried it with 500k rows on .bean(QueryTable.class, "queryData").split(body()).parallelProcessing()
because it first tried caching all of the data from the query before parsing it. I tried setting fetchSize
to something like 100 but I got the same error, and using maxRows
would only get me the amount of rows I specified and ignore the rest.
My next attempt was using one of Camel's components like sql-component and jdbc and trying to use a Splitter to process each row in separate threads but I got the same exact problem.
sql:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same
jdbc:
from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()
My last attempt was to use maxMessagesPerPoll
for sql and outputType=StreamList
for jdbc components but unfortunatelly the former only processes one row at a time (as well as it has to be a consumer to be used as such) and the latter gives me an java.sql.SQLException: Cursor not open
exception.
sql:
from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component
jdbc:
.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception
The end goal is to be able to process millions of rows without consuming so much memory so as to prevent the "OutOfMemory" error. My idea, if possible is to do the following:
I know this question is similar to this one but the answer doesn't really help my situation. I also noticed that in the documentation for the sql component it has an outputType=StreamList
option for the producer but it is implemented on version 2.18 and higher while I have version 2.14.1.
Any help and tips would be extremely helpful!
Thanks.
Some other info: Apache Camel Version: 2.14.1 Database: Informix
Upvotes: 3
Views: 3724
Reputation: 131
After quite a bit of research, some more trial and error and the tips from NotaJD, I found a solution that could work out (still testing). Actually make that 2 solutions, but they differ in just their type of execution.
For the sakes of explanation I will use the following info:
AggregationStrategyImpl
extends AggregationStrategy
with the following:
List<Object>
in the exchange bodyPredicate
completes when List<Object> >= 50000
30000
millisecondsCustomThreadPool
is a pseudo-implementation of Camel's ThreadPoolBuilder
class:
from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")
The code will still run on Quartz cron-timer (00:01 every day) but this time my QueryTable.class
will fetch the proper query to execute (instead of SELECT *
, I now specified the columns I need) and set it to the exchange body.
.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")
The Camel jdbc
component will take the query from the exchange body, set the resetAutoCommit
to false so it doesn't throw the Cursor not open
error, set the output to streamming and split-stream the execution, thus I will not query all records at once but instead one by one. Every record fetched will then be tranformed into a proper POJO via TransformRecord.class
.
.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();
This time I use an aggregate
component to create a list of records. The aggregationStrategyImpl
contains the logic for aggregation as well as the completion predicate and timeout, thus when I reach a certain amount of records (or a timeout occurs) the list will be sent to "direct:start-processing".
More on the aggregation implementation in this Source Allies blog and in the Apache Camel Aggregate EIP docs.
from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();
Here I split the list obtained and using a custom ThreadPool I create N amount of threads to analyze and process each record individually. This way I can process my list in parallel processing instead of one by one. I could have used .split(body()).parallelProcessing()
but the default ThreadPool settings might not be optimal later on.
More on the ThreadPool implementation on the Apache Camel Threading Model docs, the ThreadPool Configuration notes and the Red Hat Threading Model docs.
For this solution it is basically the extact same execution but with the following changes:
// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally
What this would do is send the list to be processes asynchronously, allowing up to 1 other list to be queued in-memory and pausing the parent thread if the queue is full. So instead of waiting for the list of records to be processed the parent thread would go back and collect another batch of records. This also means that in case the processing route hasn't finished, the new records won't get thrown out and the parent thread would wait until it can send the batch to the SEDA in-memory queue.
More on the SEDA component on the Apache Camel SEDA Component docs in GitHub and in their site
With solution 1 it should take much longer to complete since it is processing all the data first before gathering more records from the query, but the memory consuption should be FAR less since it is controlled in the aggregation predicate.
With solution 2 it should be much faster since it is gathering the next batch of records from the query while processing the previous batch, but the memory consumption would be FAR greater since it would hold at most 3 lists: the one being processed, the one in the SEDA queue and the latest batch gathered by the parent thread (paused when queue is full).
I stated that I am still testing these solutions because with the 500k records it works, but I'm still working out the optimal ThreadPool settings for the server where this will be implemented in. I have researched about threading in Java but it seems that there really isn't that much to go by other than the system's architecture, RAM and trial and error.
Upvotes: 6