vinay narayana
vinay narayana

Reputation: 307

How can i write kafka consumer without using infinite loop during deserialization?

How to write kafka consumer in java without using infinite loop for polling?

I have created kafka consumer by using this link as reference. here in processing incoming records function while(true) loop was written in which it is polling for new events. If i use this in my project i am not able to do anything else except this. is there a way to avoid using this infinite loop to get new events?

 public static void main(String[] str) throws InterruptedException {
    System.out.println("Starting  AtMostOnceConsumer ...");
    execute();
}
private static void execute() throws InterruptedException {
    KafkaConsumer<String, Event> consumer = createConsumer();
    // Subscribe to all partition in that topic. 'assign' could be used here
    // instead of 'subscribe' to subscribe to specific partition.
    consumer.subscribe(Arrays.asList("topic"));
    processRecords(consumer);
}
private static KafkaConsumer<String, Event> createConsumer() {
    Properties props = new Properties();
    String consumeGroup = "group_id";
    props.put("group.id", consumeGroup);
    props.put("org.slf4j.simpleLogger.defaultLogLevel", "INFO");
    props.put("client.id", "clientId");
    props.put("security.protocol", "SASL_SSL");

    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "servers");
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + "username" + " password="" + "password";");
    props.put("enable.auto.commit", "true");
    // Auto commit interval, kafka would commit offset at this interval.
    props.put("auto.commit.interval.ms", "101");
    // This is how to control number of records being read in each poll
    props.put("max.partition.fetch.bytes", "135");
    // Set this if you want to always read from beginning.
    // props.put("auto.offset.reset", "earliest");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "6001");
    props.put("schema.registry.url", "https://avroregistry.octanner.io");
    props.put("key.deserializer",
            "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    props.put("value.deserializer",
            "io.confluent.kafka.serializers.KafkaAvroDeserializer");
    return new KafkaConsumer<String, Event>(props);
}
private static void processRecords(KafkaConsumer<String, Event> consumer) throws InterruptedException {
    while (true) {
        ConsumerRecords<String, Event> records = consumer.poll(TimeUnit.MINUTES.toMillis(1));
        long lastOffset = 0;
        for (ConsumerRecord<String, Event> record : records) {
            System.out.printf("\n\n\n\n\n\n\roffset = %d, key = %s\n\n\n\n\n\n", record.offset(), record.value());
            lastOffset = record.offset();
        }
        System.out.println("lastOffset read: " + lastOffset);
        process();
    }
}
private static void process() throws InterruptedException {
    // create some delay to simulate processing of the message.
    Thread.sleep(TimeUnit.MINUTES.toMillis(1));
}

Can someone help me to modify this so that i can avoid while(true) loop and can just listen to my incoming events?

Upvotes: 3

Views: 7563

Answers (3)

OneCricketeer
OneCricketeer

Reputation: 192023

If you want to be able to do multiple things at the same time in the code, you need background threads.

In order to do this more easily, you could use a higher-level Kafka library like Spring (already answered), Vert.x or Smallrye

Here is a Vert.x example, first create a KafkaConsumer, then assign the handler and subscribe to your topic(s)

consumer.handler(record -> {
  System.out.println("Processing key=" + record.key() + ",value=" + record.value() +
    ",partition=" + record.partition() + ",offset=" + record.offset());
});

// subscribe to a single topic
consumer.subscribe("a-single-topic");

Upvotes: 1

Reddy_73
Reddy_73

Reputation: 144

you could try something like this:

public class ConsumerDemoWithThread {
private Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class.getName());
private String bootstrapServers = "127.0.0.1:9092";
private String groupId = "my-first-application";
private String topic = "first-topic";

KafkaConsumer consumer = createConsumer(bootstrapServers, groupId, topic);

private void pollForRecords() {
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.submit(() -> processRecords());
}


private KafkaConsumer createConsumer(String bootstrapServers, String groupId, String topic) {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // create consumer
    KafkaConsumer consumer = new KafkaConsumer<String, String>(properties);
    // subscribe consumer to our topic(s)
    consumer.subscribe(Arrays.asList(topic));
    return consumer;
}


private void processRecords() {
    try {
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(100));

            for (ConsumerRecord<String, String> record : records) {
                logger.info("Key: " + record.key() + ", Value: " + record.value());
                logger.info("Partition: " + record.partition() + ", Offset:" + record.offset());
            }
        }
    } catch (WakeupException e) {
        logger.info("Received shutdown signal!");
    } finally {
        consumer.close();
    }
}

public static void main(String[] args) {
    ConsumerDemoWithThread consumerDemoWithThread = new ConsumerDemoWithThread();
    consumerDemoWithThread.pollForRecords();
}
}

Basically, as Joachim has mentioned, the entire poll and process logic needs to be delegated to a Thread

Upvotes: 5

Prashant Pandey
Prashant Pandey

Reputation: 4682

You can use @KafkaListener (https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html). However, it will also be polling in an infinite loop because that's how Kafka was designed - it's not a queue, but an event bus that stores records for some time. There's no mechanism to notify its consumers.

Poll on a different thread and have a graceful way to exit from the loop.

Upvotes: 2

Related Questions