Ehsan Zangeneh
Ehsan Zangeneh

Reputation: 23

camel loop uses a different exchange in the third iteration

I am new to Apache camel. I am working with Red Hat JBoss developer studio 11.0.0.GA. I am trying to read a bulk of records from a table and insert them into another one. since I couldn't insert all records at once (I think camel has limitation on inserting 7000 records). I used a camel loop. First I got all the records. Did some process on them and set the result list on a property. In addition, I set an Integer-value-property to imply that how many records we have to skip for the current iteration. Now my problem is that for the third round (iteration) the exchange object is changed. I debugged it and found out that the exchangeId is different. So the properties I had set are all gone.

    <camelContext id="_context1" xmlns="http://camel.apache.org/schema/blueprint">
    <route id="_route1">
        <from id="_from1" uri="file:work/input"/>
        <to id="_to2" uri="sqlAbniyeh: select id , CRATERSLIST from ABNIYEH_BRIDGE "/>
        <setProperty propertyName="isThereAnyRecord">
            <simple resultType="java.lang.Boolean">true</simple>
        </setProperty>
        <loop doWhile="true" id="_loop1" >
            <simple>${exchangeProperty.isThereAnyRecord} != false</simple>                
            <process id="_process1" ref="craters"/>
            <to id="_to1" uri="sqlAbniyeh:  insert into Z_ABNIYEH_BRIDGE_CRATERS( ID , NUMBER1 , LENGTH , BRIDGE_ID) values( HIBERNATE_SEQUENCE.nextval , :#value1 , :#value2 , :#id)?batch=true"/>
        </loop>
    </route>
</camelContext>

This is my process method:

    public void process(Exchange exchange) throws Exception {
    try
    {
        System.out.println("Entered Process method!");
        List<Map<String, Object>> currentList = new ArrayList<Map<String,Object>>();
        List<Map<String,Object>> newList = new ArrayList<Map<String,Object>>();
        int numberOfRecordsToSkip = 0;
        int numberOfRecordsToSkipForTheNextTime ;
        List<Map<String, Object>> currentListOnProperties = (List<Map<String,Object>>) exchange.getProperty("listOfRecords");
        numberOfRecordsToSkip = exchange.getProperty("numberOfRecordsToSkip") != null ?
                                                                    (Integer)exchange.getProperty("numberOfRecordsToSkip"): 0;

        if(currentListOnProperties != null)
        {
            newList = currentListOnProperties;              
        }
        else
        {
            // It occurs just the first time                
            currentList = (List<Map<String,Object>>)exchange.getIn().getBody();
            newList = OrganizeListForInsert(currentList);
        }

        int temp = (numberOfRecordsToSkip + NUMBER_OF_RECORDS_FOR_EACH_ROUND);
        if(temp < newList.size()) 
        {
            numberOfRecordsToSkipForTheNextTime =   temp; 
        }
        else
        {   
            numberOfRecordsToSkipForTheNextTime = numberOfRecordsToSkip + ( currentList.size() - numberOfRecordsToSkip);
            exchange.removeProperty("isThereAnyRecord");
            exchange.setProperty("isThereAnyRecord", false);
        }
        exchange.removeProperty("numberOfRecordsToSkip");
        exchange.setProperty("numberOfRecordsToSkip", new Integer(numberOfRecordsToSkipForTheNextTime));
        exchange.setProperty("listOfRecords", newList);
        List<Map<String, Object>> sublistOfNewList = 
                        new ArrayList<Map<String,Object>>(newList.subList(numberOfRecordsToSkip, numberOfRecordsToSkipForTheNextTime));
        exchange.getIn().setBody(sublistOfNewList);
    }
    catch(Exception e)
    {
        e.printStackTrace();
    }
    System.out.println("End of everything!!");
}

Upvotes: 0

Views: 885

Answers (1)

Ehsan Zangeneh
Ehsan Zangeneh

Reputation: 23

As Bedla said I should have used EIPs instead of writing everything in java. I used Splitter EIP and it works well for my case. Here is the new code:

    <camelContext id="_context1" xmlns="http://camel.apache.org/schema/blueprint">
    <route id="_route1">
        <from id="_from1" uri="file:work/input"/>
        <to id="_to2" uri="sqlAbniyeh: select id , CRATERSLIST from ABNIYEH_BRIDGE "/>
        <process id="_process1" ref="craters"/>
        <split>
            <simple>${body}</simple>
            <to id="_to1" uri="sqlAbniyeh:  insert into ABNIYEH_BRIDGE_CRATERS( ID , NUMBER1 , LENGTH , BRIDGE_ID) values( HIBERNATE_SEQUENCE.nextval , :#value1 , :#value2 , :#id)"/>
        </split>
    </route>
</camelContext>

and the process method:

    @Override
public void process(Exchange exchange) throws Exception {
    try
    {
        List<Map<String, Object>> currentList = new ArrayList<Map<String,Object>>();
        List<Map<String,Object>> newList = new ArrayList<Map<String,Object>>();
        currentList = (List<Map<String,Object>>)exchange.getIn().getBody();
        newList = OrganizeListForInsert(currentList);
        exchange.getIn().setBody(newList);

    }
    catch(Exception e)
    {
        e.printStackTrace();
    }
}

Upvotes: 2

Related Questions