Sonia
Sonia

Reputation: 127

Integrate kafka Consumer spring batch

I have a Kafka Consumer developed in spring-boot and i am able to read the messages from the topic. I want to integrate it with the Spring batch because i want to create an batch file. I am not sure how to do this.

Upvotes: 2

Views: 6785

Answers (2)

Sushil Mittal
Sushil Mittal

Reputation: 524

Try like as below:

private static final Logger LOG = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "batch-listener", topics = "${app.topic.batch}")
public void receive(@Payload List<String> messages,
                    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                    @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

    LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
    LOG.info("beginning to consume batch messages");

    for (int i = 0; i < messages.size(); i++) {

        LOG.info("received message='{}' with partition-offset='{}'",
                messages.get(i), partitions.get(i) + "-" + offsets.get(i));

    }
    LOG.info("all batch messages consumed");
}



 @EnableKafka
 @Configuration
 public class ListenerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
    return props;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
    return factory;
}

}

ref: https://memorynotfound.com/spring-kafka-batch-listener-example/

Upvotes: 0

Mahmoud Ben Hassine
Mahmoud Ben Hassine

Reputation: 31600

Spring Batch added support to read/write data from/to Kafka topics in v4.2, see KafkaItemReader and KafkaItemWriter.

You can also take a look at the Spring Tips installment about Kafka support in Spring Batch by Josh Long.

Upvotes: 3

Related Questions