Reputation: 1372
I am new to sporing batch. I have requirement which need to read kafka stream and filter data & save in database. For That I have used spring batch with KafkaItemReader. When I run start multiple jobs in spring job it gives java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access Error. In this Time It only run last job only.
This is the spring batch config.
@Autowired
TaskExecutor taskExecutor;
@Autowired
JobRepository jobRepository;
@Bean
KafkaItemReader<Long, Event> kafkaItemReader() {
Properties props = new Properties();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Long, Event>()
.partitions(0)
.consumerProperties(props)
.name("event-reader")
.saveState(true)
.topic(topicName)
.build();
}
@Bean
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean(name = "JobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
And there Is controller End Point which start new jobs. This is the way I have to use start new Job
@Autowired
@Qualifier("JobLauncher")
private JobLauncher jobLauncher;
Map<String, JobParameter> items = new HashMap<>();
items.put("userId", new JobParameter("UserInputId"));
JobParameters paramaters = new JobParameters(items);
try {
jobLauncher.run(job, paramaters);
} catch (Exception e) {
e.printStackTrace();
}
I have seen that KafkaItemReader is not thread safe. I want to know is this way correct or Is there any way to read kafka streams in multi threaded spring batch environment. Thanks & Regards
Upvotes: 1
Views: 2767
Reputation: 31600
The KafkaItemReader
is documented to be non thread safe, here is an excerpt from its Javadoc:
Since KafkaConsumer is not thread-safe, this reader is not thread-safe.
So using it in a multi-threaded environemnt is incorrect and not conform to the documentation. What you can do is use a reader per partition.
Upvotes: 2
Reputation: 4614
As per spring documentation, it uses KafkaConsumer; which itself is not threadsafe as per their detailed documentation .
Please see if you can use any of approach (i.e. decoupling or single consumer per thread) as mentioned on that documentation. In your example, you may need to use separate handler for taskexecutor (if you follow decoupling approach).
Upvotes: 1