Akash deep
Akash deep

Reputation: 11

AOP configuration seems to be invalid - Spring batch (Skip Listener)

I am using spring batch and I'm trying to do a POC to skip the entries for which I get an exception. To do this, I have written a SkipListener with some logic to populate an external table with the failed records.

package in.novopay.accounting.batch.listeners;

import in.novopay.accounting.account.loans.entity.LoanAccountEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.Date;

@Component
@StepScope
public class LoanAccountDpdSkipListener implements SkipListener<Object[], LoanAccountEntity> {

    private static final Logger LOGGER = LoggerFactory.getLogger(LoanAccountDpdSkipListener.class);
    private JdbcTemplate jdbcTemplate;
    private static final String jobName = "loanAccountDpdCalcJob";
    private static final String groupCode = "LMS-EOD-BOD";

    @Autowired
    public LoanAccountDpdSkipListener(DataSource dataSource){
        jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void onSkipInWrite(LoanAccountEntity loanAccountEntity, Throwable t){
        LOGGER.info("***Skipping item***: Loan Account Number: {}", loanAccountEntity.getAccountNumber());
        String sql = "INSERT INTO batch_failure_audit (job_name, execution_date, context_type, context_value, failure_message, failure_stack_trace, group_code) VALUES (?, ?, ?, ?, ?, ?, ?)";

        jdbcTemplate.update(sql, jobName, new Date(), LoanAccountEntity.class, loanAccountEntity.getAccountNumber(),
                t.getMessage(), t.getStackTrace(), groupCode);

    }

    @Override
    public void onSkipInProcess(Object[] object, Throwable t){
        Long accountId = (Long) object[0];
        LOGGER.info("***Skipping item***: Loan Account ID: {}", accountId);
        String sql = "INSERT INTO batch_failure_audit (job_name, execution_date, context_type, context_value, failure_message, failure_stack_trace, group_code) VALUES (?, ?, ?, ?, ?, ?, ?)";

        jdbcTemplate.update(sql, jobName, new Date(), LoanAccountEntity.class, accountId.toString(),
                t.getMessage(), t.getStackTrace(), groupCode);

    }


}

My Item Reader

public class LoanAccountDpdCalcItemReader extends JdbcPagingItemReader<Object[]> {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoanAccountDpdCalcItemReader.class);

    public LoanAccountDpdCalcItemReader(DataSource dataSource , Map<String, Object> jobParameters, Long minValue , Long maxValue){
        super();
        setDataSource(dataSource);

        String querySelectClause=" SELECT la.account_id, la.loan_product_id, la.past_due_days, a.account_number, a.currency, " +
                " acs.criteria, acs.id, la.expected_disbursement_date, lp.product_id, la.loan_status, la.npa_ageing_start_date , a.office_id  ";
        String queryFromClause = "  FROM loan_account la JOIN account a ON a.id = la.account_id " +
                " JOIN asset_criteria_slabs acs ON acs.id = la.asset_criteria_slabs_id " +
                " JOIN loan_product lp ON lp.id = la.loan_product_id " +
                " JOIN product_scheme ps ON ps.id = a.product_scheme_id  ";

        String queryWhereClause=" WHERE la.loan_status IN ('ACTIVE','FORECLOSURE_FREEZE') " +
                " and la.account_id >="+minValue.intValue()+" and la.account_id <="+maxValue.intValue();


        final SqlPagingQueryProviderFactoryBean sqlPagingQueryProviderFactoryBean = new SqlPagingQueryProviderFactoryBean();
        sqlPagingQueryProviderFactoryBean.setDataSource(dataSource);
        sqlPagingQueryProviderFactoryBean.setSelectClause(querySelectClause);
        sqlPagingQueryProviderFactoryBean.setFromClause(queryFromClause);
        sqlPagingQueryProviderFactoryBean.setWhereClause(queryWhereClause);
        sqlPagingQueryProviderFactoryBean.setSortKey("account_id");
        try {
            PagingQueryProvider pagingQueryProvider= sqlPagingQueryProviderFactoryBean.getObject();
            if(pagingQueryProvider!=null){
                setQueryProvider(pagingQueryProvider);
            }

        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        setFetchSize(0);
        int chunk = getChunk(jobParameters);
        setPageSize(chunk);
        setRowMapper(new ResultMapper());
    }

    private  int getChunk(Map<String, Object> jobParameters) {
        int chunk =Constants.WORKER_CHUNK_SIZE;
        if(jobParameters.containsKey(Constants.CHUNK_KEY)){
            Object chunkParam =jobParameters.get(Constants.CHUNK_KEY);
            if(chunkParam instanceof String chunkParamStr){
                chunk  = Integer.parseInt(chunkParamStr);
            }else{
                chunk = (Integer) chunkParam;
            }

        }
        return chunk;
    }


}

My Item Processor

@Component
@StepScope
public class LoanAccountDpdCalcItemProcessor implements ItemProcessor<Object[], LoanAccountEntity> {

    @Autowired
    LoanAccountDpdCalcBatchProcessor loanAccountDpdCalcBatchProcessor;

    @Value("#{jobParameters['job_time']}")
    private String jobTime;

    @Override
    public LoanAccountEntity process(Object[] objects) throws Exception {

        if (StringUtils.isBlank(jobTime)) {
            return null;
        }
        Calendar nowCal = Calendar.getInstance();
        nowCal.setTimeInMillis(Long.parseLong(jobTime));

        return loanAccountDpdCalcBatchProcessor.processIndividualActiveAccount(objects, nowCal.getTime());
    }
}

My Item Writer

@Component
@StepScope
public class LoanAccountDpdCalcItemWriter extends JpaItemWriter<LoanAccountEntity> {
    @Autowired
    private LoanAccountDAOService loanAccountDAOService;

    LoanAccountDpdCalcItemWriter(EntityManagerFactory entityManagerFactory){
        super();
        setEntityManagerFactory(entityManagerFactory);
    }

    @Override
    public void write(Chunk<? extends LoanAccountEntity> chunk) {
        List<? extends LoanAccountEntity> list = chunk.getItems();
        List<LoanAccountEntity> entities = (List<LoanAccountEntity>) list;
        loanAccountDAOService.saveAll(entities);
    }
}

Building the step:

buildStep(ItemReader<I> itemReader,
                                  ItemProcessor<? super I, ? extends O> processor,
                                  ItemWriter<? super O> itemWriter, SkipListener<? super I, ? extends O> skipListener)
return workerStepBuilderFactory.get(stepName)
                    .repository(jobRepository)
                    .jobExplorer(jobExplorer)
                    .inputChannel(requestForWrks)
                    .outputChannel(repliesFromWrks)
                    .<I, O>chunk(chunkSize,transactionManager)
                    .reader(itemReader)
                    .writer(itemWriter)
                     .faultTolerant()
                        .skip(Exception.class)
                        .skipLimit(Integer.MAX_VALUE)
                        .listener(skipListener);

Now, when an exception is thrown in the itemprocessor(I throw it explicitly to test my setup), I get:

[2024-04-17 23:18:15.974] [] [accounting] [ThreadPoolEx-3] [5a6ab4f3c19dbe7c] [1b94098a260b50d75cdad626e7cf9521] [] [] [ERROR] [o.s.b.c.s.AbstractStep] Encountered an error executing step loanAccountDpdCalcJobsStep1 in job loanAccountDpdCalcJob
org.springframework.batch.core.step.skip.SkipListenerFailedException: Fatal exception in SkipListener.
java.lang.RuntimeException: Manually triggering exception to test skipping logic
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:450) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.write(FaultTolerantChunkProcessor.java:429) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:227) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:388) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:312) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-6.1.4.jar!/:6.1.4]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:255) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:261) ~[spring-batch-infrastructure-5.1.1.jar!/:5.1.1]
    at in.novopay.infra.batch.config.TenantAwareTaskDecorator.lambda$decorate$0(TenantAwareTaskDecorator.java:19) ~[infra-batch-0.0.4.RELEASE-plain.jar!/:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) [?:?]
    at java.lang.Thread.run(Thread.java:833) [?:?]
Caused by: org.springframework.aop.AopInvocationException: AOP configuration seems to be invalid: tried calling method [public void in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener.onSkipInWrite(in.novopay.accounting.account.loans.entity.LoanAccountEntity,java.lang.Throwable)] on target [in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener@7af736]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:359) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener$$SpringCGLIB$$0.onSkipInWrite(<generated>) ~[!/:?]
    at org.springframework.batch.core.listener.CompositeSkipListener.onSkipInWrite(CompositeSkipListener.java:72) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.listener.MulticasterBatchListener.onSkipInWrite(MulticasterBatchListener.java:297) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:447) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    ... 13 more
Caused by: java.lang.IllegalArgumentException: argument type mismatch
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
    at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:137) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717) ~[spring-aop-6.1.4.jar!/:6.1.4]
    at in.novopay.accounting.batch.listeners.LoanAccountDpdSkipListener$$SpringCGLIB$$0.onSkipInWrite(<generated>) ~[!/:?]
    at org.springframework.batch.core.listener.CompositeSkipListener.onSkipInWrite(CompositeSkipListener.java:72) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.listener.MulticasterBatchListener.onSkipInWrite(MulticasterBatchListener.java:297) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.callSkipListeners(FaultTolerantChunkProcessor.java:447) ~[spring-batch-core-5.1.1.jar!/:5.1.1]
    ... 13 more

I couldn't find anything relevant online, I'm new to AOP so the error is making little sense to me as I have checked the method signatures thoroughly.

I have also built a small sample batch project where this skipping logic seems to be working just fine.

Upvotes: 1

Views: 390

Answers (1)

Akash deep
Akash deep

Reputation: 11

The reasons for this error can be many. But in my case there were two versions of the spring-aop library in the classpath. One in the main application and another one sneaking in from one of our internal helper libraries. Fixing that got rid of the error above.

Upvotes: 0

Related Questions