jmhg92
jmhg92

Reputation: 131

Parallel processing large SQL table with Camel

I am trying to process about 7 million rows daily from an Informix table with Apache Camel but I can't figure out how it can be accomplished.

My first attempt that was working with a very low set of data (about 50k rows) was using .split(body()).parallelProcessing() like so:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");

from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();

This of course resulted in an "OutOfMemory" error when I tried it with 500k rows on .bean(QueryTable.class, "queryData").split(body()).parallelProcessing() because it first tried caching all of the data from the query before parsing it. I tried setting fetchSize to something like 100 but I got the same error, and using maxRows would only get me the amount of rows I specified and ignore the rest.

My next attempt was using one of Camel's components like sql-component and jdbc and trying to use a Splitter to process each row in separate threads but I got the same exact problem.

sql:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same

jdbc:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()

My last attempt was to use maxMessagesPerPoll for sql and outputType=StreamList for jdbc components but unfortunatelly the former only processes one row at a time (as well as it has to be a consumer to be used as such) and the latter gives me an java.sql.SQLException: Cursor not open exception.

sql:

from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component

jdbc:

.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception

The end goal is to be able to process millions of rows without consuming so much memory so as to prevent the "OutOfMemory" error. My idea, if possible is to do the following:

  1. Create my query on quartz cron-trigger
  2. Obtain and group N amount of results
  3. Send a group of results to be process (in another thread) whilst another group is being obtained
  4. Repeat untill all data has been processed

I know this question is similar to this one but the answer doesn't really help my situation. I also noticed that in the documentation for the sql component it has an outputType=StreamList option for the producer but it is implemented on version 2.18 and higher while I have version 2.14.1.

Any help and tips would be extremely helpful!

Thanks.

Some other info: Apache Camel Version: 2.14.1 Database: Informix

Upvotes: 3

Views: 3724

Answers (1)

jmhg92
jmhg92

Reputation: 131

After quite a bit of research, some more trial and error and the tips from NotaJD, I found a solution that could work out (still testing). Actually make that 2 solutions, but they differ in just their type of execution.

Info:

For the sakes of explanation I will use the following info:

  • Table has 7 million records (rows)
  • AggregationStrategyImpl extends AggregationStrategy with the following:
    • Returns a List<Object> in the exchange body
    • The aggregation Predicate completes when List<Object> >= 50000
    • The aggregation timeout is set to 30000 milliseconds
  • CustomThreadPool is a pseudo-implementation of Camel's ThreadPoolBuilder class:
    • PoolSize: 100
    • MaxPoolSize: 50000
    • MaxQueueSize: 500
    • TimeUnit: MILLISECONDS
    • KeepAliveTime: 30000
  • Both implementations are being autowired

Solution 1:

from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")

The code will still run on Quartz cron-timer (00:01 every day) but this time my QueryTable.class will fetch the proper query to execute (instead of SELECT *, I now specified the columns I need) and set it to the exchange body.

.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")

The Camel jdbc component will take the query from the exchange body, set the resetAutoCommit to false so it doesn't throw the Cursor not open error, set the output to streamming and split-stream the execution, thus I will not query all records at once but instead one by one. Every record fetched will then be tranformed into a proper POJO via TransformRecord.class.

.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();

This time I use an aggregate component to create a list of records. The aggregationStrategyImpl contains the logic for aggregation as well as the completion predicate and timeout, thus when I reach a certain amount of records (or a timeout occurs) the list will be sent to "direct:start-processing".

More on the aggregation implementation in this Source Allies blog and in the Apache Camel Aggregate EIP docs.

from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();

Here I split the list obtained and using a custom ThreadPool I create N amount of threads to analyze and process each record individually. This way I can process my list in parallel processing instead of one by one. I could have used .split(body()).parallelProcessing() but the default ThreadPool settings might not be optimal later on.

More on the ThreadPool implementation on the Apache Camel Threading Model docs, the ThreadPool Configuration notes and the Red Hat Threading Model docs.

Solution 2:

For this solution it is basically the extact same execution but with the following changes:

// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally

What this would do is send the list to be processes asynchronously, allowing up to 1 other list to be queued in-memory and pausing the parent thread if the queue is full. So instead of waiting for the list of records to be processed the parent thread would go back and collect another batch of records. This also means that in case the processing route hasn't finished, the new records won't get thrown out and the parent thread would wait until it can send the batch to the SEDA in-memory queue.

More on the SEDA component on the Apache Camel SEDA Component docs in GitHub and in their site

Conclusions:

With solution 1 it should take much longer to complete since it is processing all the data first before gathering more records from the query, but the memory consuption should be FAR less since it is controlled in the aggregation predicate.

With solution 2 it should be much faster since it is gathering the next batch of records from the query while processing the previous batch, but the memory consumption would be FAR greater since it would hold at most 3 lists: the one being processed, the one in the SEDA queue and the latest batch gathered by the parent thread (paused when queue is full).

I stated that I am still testing these solutions because with the 500k records it works, but I'm still working out the optimal ThreadPool settings for the server where this will be implemented in. I have researched about threading in Java but it seems that there really isn't that much to go by other than the system's architecture, RAM and trial and error.

Upvotes: 6

Related Questions