Khai Ly
Khai Ly

Reputation: 145

Is there @KafkaListener in Reactor/Reactive Kafka?

I wish to write Reactive Kafa to listen on new coming message. But I don't know how to do that. Like @KafkaListener in blocking Kafka - it's waiting for new message

The code demo on Spring Boot Webflux and Reactor Kafka:

public class KafkaConsumer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String TOPIC = "Kafka_Example";

    private final ReceiverOptions<String, String> receiverOptions;

    public KafkaConsumer(String bootstrapServers) {

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-consumer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        receiverOptions = ReceiverOptions.create(props);

    }

    public Disposable consumeMessages(String topic) {

        ReceiverOptions<String, String> options = receiverOptions.subscription(Collections.singleton(topic))
                .addAssignListener(partitions -> System.out.println("onPartitionsAssigned " + partitions))
                .addRevokeListener(partitions -> System.out.println("onPartitionsRevoked " + partitions));

        Flux<ReceiverRecord<String, String>> kafkaFlux = KafkaReceiver.create(options).receive();
        return kafkaFlux.subscribe(record -> {
            ReceiverOffset offset = record.receiverOffset();
            System.out.printf("Received message: offset=%d key=%d value=%s\n", offset.offset(), record.key(), record.value());
            offset.acknowledge();
        });
    }

    public static void main(String[] args) throws Exception {
        KafkaConsumer consumer = new KafkaConsumer(BOOTSTRAP_SERVERS);
        consumer.consumeMessages(TOPIC);
    }
}

It's running and stop. I wish it's always waiting for new message.

Upvotes: 5

Views: 6381

Answers (2)

Pawel
Pawel

Reputation: 494

I need the same for my application. So far I found this issue on GitHub https://github.com/reactor/reactor-kafka/issues/100 (someone suggested a solution, I didn't test it yet).

There are future Kafka Reactor changes mentioned https://github.com/spring-projects/spring-kafka/pull/1123 but recently this Pull Request was closed.

Upvotes: 0

Jim C
Jim C

Reputation: 4385

package com.simplest.kafkaconsumer.services;



import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

@Component
public class ReactorKafkaReceiver {

    private static final Logger log = LoggerFactory.getLogger(ReactorKafkaReceiver.class.getName());

    private KafkaReceiver kafkaReceiver;

    public ReactorKafkaReceiver() {


        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "test");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        ReceiverOptions<Object, Object> consumerOptions = ReceiverOptions.create(consumerProps)
                .subscription(Collections.singleton("test"))
                .addAssignListener(partitions -> log.debug("onPartitionsAssigned {}", partitions))
                .addRevokeListener(partitions -> log.debug("onPartitionsRevoked {}", partitions));

        kafkaReceiver = KafkaReceiver.create(consumerOptions);

        ((Flux<ReceiverRecord>) kafkaReceiver.receive())
                .doOnNext(r -> {
                    System.out.println(r.value());
                    r.receiverOffset().acknowledge();
                })
                .subscribe();
    }

}

If you want to test: 1 - startzookeeper (default setups) 2 - start kafka server (default setups) 3 - create a topic named test 4 - produce a simple message 5 - create a simple project in spring initializer and add reactor-kafka dependence and obviously add accordingly the service above

I wondered if I should pasted as answer because your topic question is: Is there @KafkaListener in Reactor/Reactive Kafka? but after read twice your question it seems you are mainly facing issue to keep listening (It's running and stop. I wish it's always waiting for new message.)

Upvotes: 3

Related Questions