Sajith Vijesekara
Sajith Vijesekara

Reputation: 1372

spring batch KafkaConsumer is not safe for multi-threaded access

I am new to sporing batch. I have requirement which need to read kafka stream and filter data & save in database. For That I have used spring batch with KafkaItemReader. When I run start multiple jobs in spring job it gives java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access Error. In this Time It only run last job only.

This is the spring batch config.

    @Autowired
    TaskExecutor taskExecutor;

    @Autowired
    JobRepository jobRepository;

    @Bean
    KafkaItemReader<Long, Event> kafkaItemReader() {
        Properties props = new Properties();
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
        props.putAll(this.properties.buildConsumerProperties());
        return new KafkaItemReaderBuilder<Long, Event>()
                .partitions(0)
                .consumerProperties(props)
                .name("event-reader")
                .saveState(true)
                .topic(topicName)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor(){
        SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
        asyncTaskExecutor.setConcurrencyLimit(5);
        return asyncTaskExecutor;
    }

    @Bean(name = "JobLauncher")
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor);
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }

And there Is controller End Point which start new jobs. This is the way I have to use start new Job

    @Autowired
    @Qualifier("JobLauncher")
    private JobLauncher jobLauncher;


    Map<String, JobParameter> items = new HashMap<>();
    items.put("userId", new JobParameter("UserInputId"));
    JobParameters paramaters = new JobParameters(items);
    try {
        jobLauncher.run(job, paramaters);
    } catch (Exception e) {
        e.printStackTrace();
    }

I have seen that KafkaItemReader is not thread safe. I want to know is this way correct or Is there any way to read kafka streams in multi threaded spring batch environment. Thanks & Regards

Upvotes: 1

Views: 2767

Answers (2)

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

The KafkaItemReader is documented to be non thread safe, here is an excerpt from its Javadoc:

Since KafkaConsumer is not thread-safe, this reader is not thread-safe.

So using it in a multi-threaded environemnt is incorrect and not conform to the documentation. What you can do is use a reader per partition.

Upvotes: 2

Ashish Patil
Ashish Patil

Reputation: 4614

As per spring documentation, it uses KafkaConsumer; which itself is not threadsafe as per their detailed documentation .

Please see if you can use any of approach (i.e. decoupling or single consumer per thread) as mentioned on that documentation. In your example, you may need to use separate handler for taskexecutor (if you follow decoupling approach).

Upvotes: 1

Related Questions