AbNig
AbNig

Reputation: 352

Spring Batch: Reading from a JMS queue, step does not end

I have a simple bach job which reads from a JMS queue (ActiveMQ) and writes to a file. The batch job runs as expected and writes to the file honoring the commit interval which has been set to 10,000.

There are 2 observations in this regard

  1. The batch job reading queue does not end.

  2. I see that all the messages from the queue have been consumed but the last chunk gets written to file only when new messages are pushed to the JMS queue and the commit interval is met.

Is it the expected behavior? I would like to schedule the batch job and consume and write all the messages present in the queue at that point of time. Any advise?

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Bean
public TransactionAwareConnectionFactoryProxy activeMQConnectionFactory() {
    ActiveMQConnectionFactory amqConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    TransactionAwareConnectionFactoryProxy activeMQConnectionFactory = new TransactionAwareConnectionFactoryProxy(amqConnectionFactory);
    return activeMQConnectionFactory;
}

@Bean
public ActiveMQQueue defaultQueue() {
    return new ActiveMQQueue("firstQueue");
}

@Bean
public PlatformTransactionManager transactionManager() {
    return new ResourcelessTransactionManager();
}

@Bean
public JobRepository jobRepository(PlatformTransactionManager transactionManager) throws Exception {
    return new MapJobRepositoryFactoryBean(transactionManager).getObject();
}

@Bean
@DependsOn("jobRepository")
public SimpleJobLauncher simpleJobLauncher(JobRepository jobRepository) {
    SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
    simpleJobLauncher.setJobRepository(jobRepository);
    return simpleJobLauncher;
}   

If I set the receiveTimeout to a smaller number, all messages are not consumed, thus set to the upper limit.

@Bean
@DependsOn(value = { "activeMQConnectionFactory", "defaultQueue" })
public JmsTemplate firstQueueTemplate(ActiveMQQueue defaultQueue, TransactionAwareConnectionFactoryProxy activeMQConnectionFactory) {
    JmsTemplate firstQueueTemplate = new JmsTemplate(activeMQConnectionFactory);
    firstQueueTemplate.setDefaultDestination(defaultQueue);
    firstQueueTemplate.setSessionTransacted(true);
    firstQueueTemplate.setReceiveTimeout(Long.MAX_VALUE);
    return firstQueueTemplate;
}

Config for the batch job.

@Bean
public JmsItemReader<String> jmsItemReader(JmsTemplate firstQueueTemplate) {
    JmsItemReader<String> jmsItemReader = new JmsItemReader<>();
    jmsItemReader.setJmsTemplate(firstQueueTemplate);
    jmsItemReader.setItemType(String.class);
    return jmsItemReader;
}


@Bean
public ItemWriter<String> flatFileItemWriter() {
    FlatFileItemWriter<String> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource("/mypath/output.csv"));
    writer.setLineAggregator(new PassThroughLineAggregator<String>());
    return writer;
}

@Bean
@DependsOn(value = { "jmsItemReader", "jmsItemWriter", "jobRepository", "transactionManager" })
public Step queueReaderStep(JmsItemReader<String> jmsItemReader, ItemWriter<String> flatFileItemWriter, JobRepository jobRepository,
        PlatformTransactionManager transactionManager) throws Exception {
    StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository, transactionManager);
    AbstractTaskletStepBuilder<SimpleStepBuilder<String, String>> step = stepBuilderFactory.get("queueReaderStep").<String, String> chunk(10000)
            .reader(jmsItemReader).writer(flatFileItemWriter);
    return step.build();
}


@Bean
@DependsOn(value = { "jobRepository", "queueReaderStep" })
public Job jsmReaderJob(JobRepository jobRepository, Step queueReaderStep) {
    return this.jobBuilderFactory.get("jsmReaderJob").repository(jobRepository).incrementer(new RunIdIncrementer())
            .flow(queueReaderStep).end().build();
}

Upvotes: 0

Views: 3272

Answers (1)

Michael Minella
Michael Minella

Reputation: 21463

The JmsItemReader provided by Spring Batch is really meant as more of a template or example since, as you note, it never returns null so the step never ends. You'd need to write something to indicate that a given message indicated that the step was complete.

Upvotes: 3

Related Questions