Justin Smith
Justin Smith

Reputation: 81

Kafka Consumer Cluster Environment Offset

I am trying to have x number of consumers access a specified topic in kafka but not consume the same messages. I want for example...

Consumer 1 pick up offset 1 Consumer 2 pick up offset 2 Consumer 1 pick up offset 3 Consumer 2 pick up offset 4

I want kafka to act as a queue for those two consumers. I noticed the group.id configuration and I assumed that you could use the same group and it would handle it accordingly but it does not seem to work the way I thought it would.

Here is the code I am using...

     public void init(){
            Properties props = new Properties();
            props.put("bootstrap.servers", kafkaUrl);
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            props.put("enable.auto.commit", "true");
            props.put("group.id", "group1");
            props.put("client.id", "KafkaConsumer-" + InetAddress.getLocalHost().getHostAddress());

            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("event1", "event2"));

            Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::pollTopics, 1, 10, TimeUnit.SECONDS);
     }

     public void pollTopics() {
        try {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);

            for (ConsumerRecord<String, String> record : records) {
                AbstractProcessor processor = Processor.getProcessor(record.value(), record.topic(), mqttMapping, crudRepositoryStore);
                if(processor != null) {
                    kafkaThreadPool.execute(processor);
                }
            }
        }catch (Exception e){
            LOG.error("Polling exception occurred", e);
        }
    }

I want to be able to run this code in a cluster environment and have kafka be the queue. I want it to pull the message and go to the next offset at the same time, then the next kafka poll will grab the next offset. Is this possible? And if so what am I doing wrong?

Upvotes: 0

Views: 832

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62310

That is not possible in Kafka (in the way you describe it).

If you use consumer groups, a single partition can only be read by a single consumer. Thus, Kafka does scale by partitions, ie, if you want to have multiple consumers (reading different data) you need at least one partition for each consumer. If you have more partitions than consumers, some (or all) consumers will read multiple partitions at the same time.

The solution for you is, to create a topic with multiple partitions (or use multiple topics and let all consumers of your group subscribe to a topics).

Upvotes: 1

Related Questions