Bandita Pradhan
Bandita Pradhan

Reputation: 97

Records are not written in files when invoked from BillerOrderWriter which implements ItemWriter in Spring Batch

I am trying to write successful records using one writer and failed records in another writer.

I have written BillerOrderWriter class which implements ItemWriter. I put some log statements and I can see it writes success billerOrderId or failed billerOrderId . But, it seems like it does not invoke DatabaseToCsvFileJobConfig or SuccessfulOrdersToCsvFileJobConfig .


public class BillerOrderWriter implements ItemWriter<BillerOrder>{
    
    private static Logger log = LoggerFactory.getLogger("BillerOrderWriter.class");
    
    @Autowired
    SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig;
    
    @Autowired
    DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig;
    
    @Override
    public void write(List<? extends BillerOrder> items) throws Exception {
                
        for (BillerOrder item : items) {
            log.info("item = " + item.toString());
            if (item.getResult().equals("SUCCESS")) {
                log.info(" Success billerOrderId = " + item.getBillerOrderId());
                successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter();
            } else {
                log.info("Failed billerOrderId = " + item.getBillerOrderId());
                databaseToCsvFileJobConfig.databaseCsvItemWriter();
            }
        }   
    }
}

Here is BatchConfig class.

 @Bean
  public BillerOrderWriter billerOrderWriter() {
      return new BillerOrderWriter();
  }
  
 
  
  @Bean
  public Job importJobOrder(JobCompletionNotificationListner listener, Step step1) {
      return jobBuilderFactory.get("importJobOrder")
              .incrementer(new RunIdIncrementer())
              .listener(listener)
              .flow(step1)
              .end()
              .build(); 
  }
  
  @Bean(name="step1")
  public Step step1(BillerOrderWriter billerOrderWriter) {
      return stepBuilderFactory.get("step1")             
              .<BillerOrder, BillerOrder> chunk(10)
              .reader((ItemReader<? extends BillerOrder>) reader())
              .processor(processor())
              .writer(billerOrderWriter)
              .build();
  }
 

Here is my successwriter and failedwriter class .


@Configuration
public class SuccessfulOrdersToCsvFileJobConfig {
    private static Logger log = LoggerFactory.getLogger("SuccessfulOrdersToCsvFileJobConfig.class");
    
    @Bean
    public ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter() {
        log.info("Entering SuccessfulOrdersToCsvFileJobConfig...");
        FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
        
        String exportFileHeader = "BillerOrderId;SuccessMessage";
        OrderWriter headerWriter = new OrderWriter(exportFileHeader);
        csvFileWriter.setHeaderCallback(headerWriter);
        
        String exportFilePath = "/tmp/SuccessBillerOrderIdForRetry.csv";
        csvFileWriter.setResource(new FileSystemResource(exportFilePath));
        
        LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
        csvFileWriter.setLineAggregator(lineAggregator);
        
        return csvFileWriter;
        
    }
    
    private LineAggregator<BillerOrder> createOrderLineAggregator() {
        log.info("Entering createOrderLineAggregator...");
        DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(";");
        
        FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
        lineAggregator.setFieldExtractor(fieldExtractor);
        
        return lineAggregator;
    }
    
    private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
        log.info("Entering createOrderFieldExtractor...");
        BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
        extractor.setNames(new String[] {"billerOrderId","successMessage"});
        return extractor;
    }
}

@Configuration
public class DatabaseToCsvFileJobConfig {
    private static Logger log = LoggerFactory.getLogger("DatabaseToCsvFileJobConfig.class");
    
    @Bean
    public ItemWriter<BillerOrder> databaseCsvItemWriter() {
        log.info("Entering databaseCsvItemWriter...");
        FlatFileItemWriter<BillerOrder> csvFileWriter = new FlatFileItemWriter<>();
        
        String exportFileHeader = "BillerOrderId;ErrorMessage";
        OrderWriter headerWriter = new OrderWriter(exportFileHeader);
        csvFileWriter.setHeaderCallback(headerWriter);
        
        String exportFilePath = "/tmp/FailedBillerOrderIdForRetry.csv";
        csvFileWriter.setResource(new FileSystemResource(exportFilePath));
        
        LineAggregator<BillerOrder> lineAggregator = createOrderLineAggregator();
        csvFileWriter.setLineAggregator(lineAggregator);
        
        return csvFileWriter;
        
    }
    
    private LineAggregator<BillerOrder> createOrderLineAggregator() {
        log.info("Entering createOrderLineAggregator...");
        DelimitedLineAggregator<BillerOrder> lineAggregator = new DelimitedLineAggregator<>();
        lineAggregator.setDelimiter(";");
        
        FieldExtractor<BillerOrder> fieldExtractor = createOrderFieldExtractor();
        lineAggregator.setFieldExtractor(fieldExtractor);
        
        return lineAggregator;
    }
    
    private FieldExtractor<BillerOrder> createOrderFieldExtractor() {
        log.info("Entering createOrderFieldExtractor...");
        BeanWrapperFieldExtractor<BillerOrder> extractor = new BeanWrapperFieldExtractor<>();
        extractor.setNames(new String[] {"billerOrderId","errorMessage"});
        return extractor;
    }
    
    
}

Here is my job completion listener class.

@Component
public class JobCompletionNotificationListner extends JobExecutionListenerSupport {

    private static final org.slf4j.Logger log = LoggerFactory.getLogger(JobCompletionNotificationListner.class);
    
    @Override
    public void afterJob(JobExecution jobExecution) {
        log.info("In afterJob ...");
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
                DatabaseToCsvFileJobConfig databaseToCsvFileJobConfig = new DatabaseToCsvFileJobConfig();
                SuccessfulOrdersToCsvFileJobConfig successfulOrdersToCsvFileJobConfig = new SuccessfulOrdersToCsvFileJobConfig();
        } 
    }
}

Upvotes: 0

Views: 49

Answers (2)

Bandita Pradhan
Bandita Pradhan

Reputation: 97

Instead Of BillerOrderWriter, I wroter BillerOrderClassifier class.

public class BillerOrderClassifier implements Classifier<BillerOrder, ItemWriter<? super BillerOrder>> {
 
    private static final long serialVersionUID = 1L;
     
    private ItemWriter<BillerOrder> successItemWriter;
    private ItemWriter<BillerOrder> failedItemWriter;
 
    public BillerOrderClassifier(ItemWriter<BillerOrder> successItemWriter, ItemWriter<BillerOrder> failedItemWriter) {
        this.successItemWriter = successItemWriter;
        this.failedItemWriter = failedItemWriter;
    }
 
    @Override
    public ItemWriter<? super BillerOrder> classify(BillerOrder billerOrder) {
        return billerOrder.getResult().equals("SUCCESS") ? successItemWriter : failedItemWriter;
    }

}

In BatchConfiguration, I wrote classifierBillerOrderCompositeItemWriter method.

@Bean
  public ClassifierCompositeItemWriter<BillerOrder> classifierBillerOrderCompositeItemWriter() throws Exception {
      ClassifierCompositeItemWriter<BillerOrder> compositeItemWriter = new ClassifierCompositeItemWriter<>();
      compositeItemWriter.setClassifier(new BillerOrderClassifier(successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter(), databaseToCsvFileJobConfig.databaseCsvItemWriter()));
      return compositeItemWriter;
  }

@Bean(name="step1")
  public Step step1() throws Exception{
    return stepBuilderFactory.get("step1")           
                  .<BillerOrder, BillerOrder> chunk(10)
                  .reader((ItemReader<? extends BillerOrder>) reader())
                  .processor(processor())
                  .writer(classifierBillerOrderCompositeItemWriter())
                  .stream(successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter())
                  .stream(databaseToCsvFileJobConfig.databaseCsvItemWriter())
                  .build();
  }

Upvotes: 0

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

In your BillerOrderWriter#write method, it is supposed to write code that does the actual write operation of items to a data sink. But in your case, you are calling successfulOrdersToCsvFileJobConfig.successfulDatabaseCsvItemWriter(); and databaseToCsvFileJobConfig.databaseCsvItemWriter(); which create item writer beans. You should inject those delegate writers and call their write method when needed, something like:

public class BillerOrderWriter implements ItemWriter<BillerOrder>{

   private ItemWriter<BillerOrder> successfulDatabaseCsvItemWriter;
   private ItemWriter<BillerOrder> databaseCsvItemWriter;
   // constructor with successfulDatabaseCsvItemWriter + databaseCsvItemWriter

   @Override
   public void write(List<? extends BillerOrder> items) throws Exception {
            
      for (BillerOrder item : items) {
        if (item.getResult().equals("SUCCESS")) {
            successfulDatabaseCsvItemWriter.write(Collections.singletonList(item));
        } else {
            databaseCsvItemWriter.write(Collections.singletonList(item));
        }
      }   
   }
}

Upvotes: 1

Related Questions