Cygnusx1
Cygnusx1

Reputation: 5409

ItemReaderAdapter and ItemStream

<bean id="itemReader" class="org.springframework.batch.item.adapter.ItemReaderAdapter">
  <property name="targetObject" ref="fooService" />
  <property name="targetMethod" value="generateFoo" />
</bean>

Given this simple setup, the fooService being a simple pojo spring bean.

If the fooService implements ItemStream and i implements correclty the open and update method, will my ItemReader be restartable?

Regards

Upvotes: 1

Views: 8015

Answers (2)

Ramesh Papaganti
Ramesh Papaganti

Reputation: 7861

Another way to achieve the the same is by extending AbstractItemCountingItemStreamItemReader

From documentation ::

Abstract superclass for ItemReaders that supports restart by storing item count in the ExecutionContext (therefore requires item ordering to be preserved between runs). Subclasses are inherently not thread-safe

Sample code :

package com.***.batch.reader;

import java.util.List;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.mivim.campaignmanager.data.model.custom.CustomCampaignSubscriberEmail;
import com.mivim.campaignmanager.service.CampaignSubscriberService;


@Component
public class CampaignSubscriberItemReader extends
        AbstractItemCountingItemStreamItemReader<CustomCampaignSubscriberEmail> {

    private Logger logger = LogManager.getLogger(CampaignSubscriberItemReader.class);

    @Autowired
    CampaignSubscriberService campaignSubscriberService;

    List<CustomCampaignSubscriberEmail> customCampaignSubscriberEmails;

    final String ecName = "csItemReaderContext";

    public CampaignSubscriberItemReader() {
        setName(ecName);
    }

    @Override
    protected CustomCampaignSubscriberEmail doRead() throws Exception {
        CustomCampaignSubscriberEmail customCampaignSubscriberEmail = customCampaignSubscriberEmails
                .get(getCurrentItemCount() - 1);
        return customCampaignSubscriberEmail;
    }

    @Override
    protected void doOpen() throws Exception {
        customCampaignSubscriberEmails = campaignSubscriberService.getPendingCampaignSubscriber();

        setMaxItemCount(customCampaignSubscriberEmails.size());

    }

    @Override
    protected void doClose() throws Exception {
        customCampaignSubscriberEmails.clear();
        setMaxItemCount(0);
        setCurrentItemCount(0);
    }


}

Upvotes: 0

Cygnusx1
Cygnusx1

Reputation: 5409

Ok, i finally answered my own question.

I find ItemReaderAdapter very helpful because most of the time we already have some kind of DAO or Service giving access to the data we need.

But my test showed me that if i use the ItemReaderAdapter impl. out-of-the-box, they were not restartable since they don't implements ItemStream!

So if any of you wants to use ItemReaderAdapter with restartable feature here is my solution.

Tested and working ;-)

1) Create your own Implementation of ItemReaderAdapter :

package xxx.readers.adapters;

import java.math.BigDecimal;

import org.apache.log4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.adapter.AbstractMethodInvokingDelegator;
import org.springframework.beans.factory.annotation.Autowired;


/**
* Invokes a custom method on a delegate plain old Java object which itself
* provides an item.
* 
* overriden to implements the ItemStream interface
* 
* @author Benoit Campeau
*/
public class MyItemReaderAdapter<T> extends AbstractMethodInvokingDelegator<T>  implements ItemReader<T>, ItemStream {

private static final Logger log = Logger.getLogger(MyItemReaderAdapter.class);

private long currentCount = 0;

private final String CONTEXT_COUNT_KEY = "ReglementAdapter.count"; 

/**
 * @return return value of the target method.
 */
public T read() throws Exception {

    super.setArguments(new Long[]{currentCount++});
    return invokeDelegateMethod();
}
@Override
public void open(ExecutionContext executionContext)
        throws ItemStreamException {
    currentCount = executionContext.getLong(CONTEXT_COUNT_KEY,0);
    log.info("Open Stream current count : " + currentCount);

}


@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    executionContext.putLong(CONTEXT_COUNT_KEY, currentCount);
    log.info("Update Stream current count : " + currentCount);
}


@Override
public void close() throws ItemStreamException {
    // TODO Auto-generated method stub

}

}

2) Now configure a Reader as an Adapter using your impl. (MyItemReaderAdapter).

<bean id="MyReader" class="xxx.readers.adapters.MyItemReaderAdapter">
    <property name="targetObject" ref="someAdapter" />
    <property name="targetMethod" value="next" />       
</bean>

3) Finally, create a Component that will serve as the Adapter delegate class:

package fcdq.iemt.batch.validation.reglement.readers.adapters;

import java.util.List;

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component("someAdapter")
public class SomeAdapter {

private static final Logger log = Logger.getLogger(SomeAdapter .class);

@Autowired
private SomeService srv1;


private List<Transaction> listTrx;



public void init() {
    log.info("Initializing " + SomeAdapter.class.toString() );
    listTrx = srv1.findByTimestampAndStatus(context.getBeginTSCutoff(), context.getEndTSCutoff(), TransactionTypeEnum.TRANSFER_COMPLETE);
        }

/**
 * read method delegate  
 * @return
 */
public Transaction next(Long index) {
    if (listTrx != null && listTrx.size() > index ) {
        return listTrx.get(index.intValue());
    } else {
        return null;
    }

}

}

Things to note:

  1. notice the setArguments in MyItemReaderAdapter. This is mandatory to pass the value of currentCount stored in the execution_context to the delegate read() method

  2. Notice that MyItemReaderAdapter does NOT implements the InitializingBean interface. I use a stepListener instead, because i want to initialize my List of item to be read just in time.

Hope it will help someone else.

Regards

Upvotes: 5

Related Questions