Niraj Sonawane
Niraj Sonawane

Reputation: 11055

Spring Batch MultiLineItemReader with MultiResourcePartitioner

I have a File which has Multiline data like this. DataID is Start of a new record. e.g. One record is a combination of ID and concatenating below line until the start of a new record.

    >DataID1
    Line1asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    Line2asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    Line3asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    >DataID2
    DataID2asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm
    >DataID3
    DataID2asdfsafsdgdsfghfghfghjfgjghjgxcvmcxnvm

I was able to implement this using SingleItemPeekableItemReader and it's working fine.

I am not trying to implement partition, As we need to process multiple files. I am not sure how the partitioner is passing file info to my customer reader and how to make my SingleItemPeekableItemReader thread safe as it not working correctly

Need some inputs as I am stuck at this point

java-config

@Bean
      public Partitioner partitioner() {
          MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
          partitioner.setResources(resources);          
          partitioner.partition(10);      
          return partitioner;
      }
      @Bean
      public TaskExecutor taskExecutor() {
          ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
          taskExecutor.setMaxPoolSize(4);
          taskExecutor.setCorePoolSize(4);
          taskExecutor.setQueueCapacity(8);
          taskExecutor.afterPropertiesSet();
          return taskExecutor;
      }   

      @Bean
      @Qualifier("masterStep")
      public Step masterStep() {
          return stepBuilderFactory.get("masterStep")                  
                  .partitioner("step1",partitioner())
                  .step(step1())
                  .taskExecutor(taskExecutor())                  
                  .build();
      }

     @Bean
      public MultiResourceItemReader<FieldSet> multiResourceItemReader() {
        log.info("Total Number of Files to be process {}",resources.length);
        report.setFileCount(resources.length);
        MultiResourceItemReader<FieldSet> resourceItemReader = new MultiResourceItemReader<FieldSet>();     
        resourceItemReader.setResources(resources);     
        resourceItemReader.setDelegate(reader());       
        return resourceItemReader;
      }

    @Bean
    public FlatFileItemReader<FieldSet> reader() {
         FlatFileItemReader<FieldSet> build = new FlatFileItemReaderBuilder<FieldSet>().name("fileReader")              
                .lineTokenizer(orderFileTokenizer())
                .fieldSetMapper(new FastFieldSetMapper())                   
                .recordSeparatorPolicy(new BlankLineRecordSeparatorPolicy())
                .build();        
         build.setBufferedReaderFactory(gzipBufferedReaderFactory);
         return build;
    }

    @Bean
    public SingleItemPeekableItemReader<FieldSet> readerPeek() {
        SingleItemPeekableItemReader<FieldSet> reader = new SingleItemPeekableItemReader<>();
        reader.setDelegate(multiResourceItemReader());
        return reader;
    }

    @Bean
    public MultiLineFastaItemReader itemReader() {
        MultiLineFastaItemReader itemReader = new MultiLineFastaItemReader(multiResourceItemReader());
        itemReader.setSingalPeekable(readerPeek());     
        return itemReader;
    }

    @Bean
    public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
        PatternMatchingCompositeLineTokenizer tokenizer = new PatternMatchingCompositeLineTokenizer();
        Map<String, LineTokenizer> tokenizers = new HashMap<>(2);
        tokenizers.put(">*", head());
        tokenizers.put("*", tail());
        tokenizer.setTokenizers(tokenizers);
        return tokenizer;
    }

    public DelimitedLineTokenizer head() {
        DelimitedLineTokenizer token = new DelimitedLineTokenizer();
        token.setNames("sequenceIdentifier");
        token.setDelimiter(" ");
        token.setStrict(false);
        return token;
    }

    public DelimitedLineTokenizer tail() {
        DelimitedLineTokenizer token = new DelimitedLineTokenizer();
        token.setNames("sequences");
        token.setDelimiter(" ");
        return token;
    }

    @Bean
    public FastReportWriter writer() {
        return new FastReportWriter();
    }

    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(masterStep())
                //.flow(step1)
                .next(step2())
                .end()
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Fasta, Fasta>chunk(5000)
                .reader(itemReader())
                .processor(new FastaIteamProcessor())
                //.processor(new PassThroughItemProcessor<>())
                .writer(writer())
                .build();
    }

public class MultiLineFastaItemReader implements ItemReader<Fasta>, ItemStream {

    private static final Logger log = LoggerFactory.getLogger(MultiLineFastaItemReader.class);
    private SingleItemPeekableItemReader<FieldSet> singalPeekable;

    AtomicInteger iteamCounter = new AtomicInteger(0);

    ConcurrentHashMap<String, AtomicInteger> fileNameAndCounterMap = new ConcurrentHashMap<>();

    @Autowired
    private SequenceFastaReport sequenceFastaReport;

    private MultiResourceItemReader<FieldSet> resourceItemReader;

    public MultiLineFastaItemReader(MultiResourceItemReader<FieldSet> multiResourceItemReader) {

        this.resourceItemReader = multiResourceItemReader;
    }

    public SingleItemPeekableItemReader<FieldSet> getSingalPeekable() {
        return singalPeekable;
    }

    public void setSingalPeekable(SingleItemPeekableItemReader<FieldSet> singalPeekable) {
        this.singalPeekable = singalPeekable;

    }

    @Override
    public Fasta read() throws Exception {
        FieldSet item = singalPeekable.read();
        if (item == null) {
            return null;
        }
        Fasta fastaObject = new Fasta();

        log.info("ID {} fileName {}", item.readString(0), resourceItemReader.getCurrentResource());
        fastaObject.setSequenceIdentifier(item.readString(0)
                .toUpperCase());
        fastaObject.setFileName(resourceItemReader.getCurrentResource()
                .getFilename());

        if (!fileNameAndCounterMap.containsKey(fastaObject.getFileName())) {
            fileNameAndCounterMap.put(fastaObject.getFileName(), new AtomicInteger(0));

        }

        while (true) {

            FieldSet possibleRelatedObject = singalPeekable.peek();
            if (possibleRelatedObject == null) {
                if (fastaObject.getSequenceIdentifier()
                        .length() < 1)
                    throw new InvalidParameterException("Somwthing Wrong in file");
                sequenceFastaReport.addToReport(fileNameAndCounterMap.get(fastaObject.getFileName())
                        .incrementAndGet(), fastaObject.getSequences());
                return fastaObject;
            }

            if (possibleRelatedObject.readString(0)
                    .startsWith(">")) {
                if (fastaObject.getSequenceIdentifier()
                        .length() < 1)
                    throw new InvalidParameterException("Somwthing Wrong in file");

                sequenceFastaReport.addToReport(fileNameAndCounterMap.get(fastaObject.getFileName())
                        .incrementAndGet(), fastaObject.getSequences());

                return fastaObject;
            }
            String data = fastaObject.getSequences()
                    .toUpperCase();
            fastaObject.setSequences(data + singalPeekable.read()
                    .readString(0)
                    .toUpperCase());

        }

    }

    @Override
    public void close() {
        this.singalPeekable.close();
    }

    @Override
    public void open(ExecutionContext executionContext) {
        this.singalPeekable.open(executionContext);

    }

    @Override
    public void update(ExecutionContext executionContext) {

        this.singalPeekable.update(executionContext);
    }

}

Upvotes: 2

Views: 2029

Answers (1)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

I am not sure how the partitioner is passing file info to my customer reader

The partitioner will create partition meta-data in step execution contexts and your reader should read that meta-data from it. In your example, you don't need to call partition on the partitioner, Spring Batch will do it. You need instead to set the partition key on the partitioner, for example:

  @Bean
  public Partitioner partitioner() {
      MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
      partitioner.setResources(resources);          
      partitioner.setKeyName("file");     
      return partitioner;
  }

This will create a partition for each file with the key file that you can get in your reader from the step execution context:

@Bean
@StepScope
public FlatFileItemReader reader(@Value("#{stepExecutionContext['file']}") String file) {
    // define your reader 
}

Note that the reader should be step scoped to use this feature. More details here: https://docs.spring.io/spring-batch/4.0.x/reference/html/step.html#late-binding

Upvotes: 2

Related Questions