Reputation: 127
I have a Kafka Consumer developed in spring-boot and i am able to read the messages from the topic. I want to integrate it with the Spring batch because i want to create an batch file. I am not sure how to do this.
Upvotes: 2
Views: 6785
Reputation: 524
Try like as below:
private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "batch-listener", topics = "${app.topic.batch}")
public void receive(@Payload List<String> messages,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
LOG.info("beginning to consume batch messages");
for (int i = 0; i < messages.size(); i++) {
LOG.info("received message='{}' with partition-offset='{}'",
messages.get(i), partitions.get(i) + "-" + offsets.get(i));
}
LOG.info("all batch messages consumed");
}
@EnableKafka
@Configuration
public class ListenerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
return factory;
}
}
ref: https://memorynotfound.com/spring-kafka-batch-listener-example/
Upvotes: 0
Reputation: 31600
Spring Batch added support to read/write data from/to Kafka topics in v4.2, see KafkaItemReader and KafkaItemWriter.
You can also take a look at the Spring Tips installment about Kafka support in Spring Batch by Josh Long.
Upvotes: 3