Reputation: 23
I am new to Apache camel. I am working with Red Hat JBoss developer studio 11.0.0.GA. I am trying to read a bulk of records from a table and insert them into another one. since I couldn't insert all records at once (I think camel has limitation on inserting 7000 records). I used a camel loop. First I got all the records. Did some process on them and set the result list on a property. In addition, I set an Integer-value-property to imply that how many records we have to skip for the current iteration. Now my problem is that for the third round (iteration) the exchange object is changed. I debugged it and found out that the exchangeId is different. So the properties I had set are all gone.
<camelContext id="_context1" xmlns="http://camel.apache.org/schema/blueprint">
<route id="_route1">
<from id="_from1" uri="file:work/input"/>
<to id="_to2" uri="sqlAbniyeh: select id , CRATERSLIST from ABNIYEH_BRIDGE "/>
<setProperty propertyName="isThereAnyRecord">
<simple resultType="java.lang.Boolean">true</simple>
</setProperty>
<loop doWhile="true" id="_loop1" >
<simple>${exchangeProperty.isThereAnyRecord} != false</simple>
<process id="_process1" ref="craters"/>
<to id="_to1" uri="sqlAbniyeh: insert into Z_ABNIYEH_BRIDGE_CRATERS( ID , NUMBER1 , LENGTH , BRIDGE_ID) values( HIBERNATE_SEQUENCE.nextval , :#value1 , :#value2 , :#id)?batch=true"/>
</loop>
</route>
</camelContext>
This is my process method:
public void process(Exchange exchange) throws Exception {
try
{
System.out.println("Entered Process method!");
List<Map<String, Object>> currentList = new ArrayList<Map<String,Object>>();
List<Map<String,Object>> newList = new ArrayList<Map<String,Object>>();
int numberOfRecordsToSkip = 0;
int numberOfRecordsToSkipForTheNextTime ;
List<Map<String, Object>> currentListOnProperties = (List<Map<String,Object>>) exchange.getProperty("listOfRecords");
numberOfRecordsToSkip = exchange.getProperty("numberOfRecordsToSkip") != null ?
(Integer)exchange.getProperty("numberOfRecordsToSkip"): 0;
if(currentListOnProperties != null)
{
newList = currentListOnProperties;
}
else
{
// It occurs just the first time
currentList = (List<Map<String,Object>>)exchange.getIn().getBody();
newList = OrganizeListForInsert(currentList);
}
int temp = (numberOfRecordsToSkip + NUMBER_OF_RECORDS_FOR_EACH_ROUND);
if(temp < newList.size())
{
numberOfRecordsToSkipForTheNextTime = temp;
}
else
{
numberOfRecordsToSkipForTheNextTime = numberOfRecordsToSkip + ( currentList.size() - numberOfRecordsToSkip);
exchange.removeProperty("isThereAnyRecord");
exchange.setProperty("isThereAnyRecord", false);
}
exchange.removeProperty("numberOfRecordsToSkip");
exchange.setProperty("numberOfRecordsToSkip", new Integer(numberOfRecordsToSkipForTheNextTime));
exchange.setProperty("listOfRecords", newList);
List<Map<String, Object>> sublistOfNewList =
new ArrayList<Map<String,Object>>(newList.subList(numberOfRecordsToSkip, numberOfRecordsToSkipForTheNextTime));
exchange.getIn().setBody(sublistOfNewList);
}
catch(Exception e)
{
e.printStackTrace();
}
System.out.println("End of everything!!");
}
Upvotes: 0
Views: 885
Reputation: 23
As Bedla said I should have used EIPs instead of writing everything in java. I used Splitter EIP and it works well for my case. Here is the new code:
<camelContext id="_context1" xmlns="http://camel.apache.org/schema/blueprint">
<route id="_route1">
<from id="_from1" uri="file:work/input"/>
<to id="_to2" uri="sqlAbniyeh: select id , CRATERSLIST from ABNIYEH_BRIDGE "/>
<process id="_process1" ref="craters"/>
<split>
<simple>${body}</simple>
<to id="_to1" uri="sqlAbniyeh: insert into ABNIYEH_BRIDGE_CRATERS( ID , NUMBER1 , LENGTH , BRIDGE_ID) values( HIBERNATE_SEQUENCE.nextval , :#value1 , :#value2 , :#id)"/>
</split>
</route>
</camelContext>
and the process method:
@Override
public void process(Exchange exchange) throws Exception {
try
{
List<Map<String, Object>> currentList = new ArrayList<Map<String,Object>>();
List<Map<String,Object>> newList = new ArrayList<Map<String,Object>>();
currentList = (List<Map<String,Object>>)exchange.getIn().getBody();
newList = OrganizeListForInsert(currentList);
exchange.getIn().setBody(newList);
}
catch(Exception e)
{
e.printStackTrace();
}
}
Upvotes: 2