Ankit
Ankit

Reputation: 2246

understanding consumer group id

I did fresh installation of Apache Kafka 0.10.1.0.

I was able to send / receive messages on command prompt.

While using Producer / Consumer Java Example, I am not able to know group.id parameter on Consumer Example.

Let me know on how to fix this issue.

Below is Consumer Example I had used:

public static void main(String[] args) {
             Properties props = new Properties();
             props.put("bootstrap.servers", "localhost:9092");
             props.put("group.id", "my-topic");
             props.put("enable.auto.commit", "true");
             props.put("auto.commit.interval.ms", "1000");
             props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
             try {
                 consumer.subscribe(Arrays.asList("my-topic"));

                     ConsumerRecords<String, String> records = consumer.poll(100);
                     System.err.println("records size=>"+records.count());
                     for (ConsumerRecord<String, String> record : records) 
                         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());



              }
             catch (Exception ex){
                 ex.printStackTrace();
             }
            finally {
                 consumer.close();
            }
        }

After running the command for consumer, I can see the messages (on the console) posted by producer. But unable to see the messages from java program

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning

Upvotes: 49

Views: 130030

Answers (6)

MANISH PARGANIHA
MANISH PARGANIHA

Reputation: 172

The consumer group id the consumer group which should be defined in the Kafka consumer.properties file.

Do add "my-topic" to consumer group and it should work as below:

# consumer group id
group.id=my-topic-consumer-group

Upvotes: 3

Harpreet Varma
Harpreet Varma

Reputation: 506

Give any random value to group id. It doesn't matter.

props.put("group.id", "Any Random Value");

Upvotes: -15

Beginner
Beginner

Reputation: 51

Here are some test results on partition and consumer property group.id

 Properties props = new Properties();
  //set all other properties as required
  props.put("group.id", "ConsumerGroup1");
  props.put("max.poll.records", "1");
  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

consumer.group id is to load balance the produced data (if the group.id is different for each consumer, each consumer will get the copy of data)

if partition=1 and total consumers count = 2, only one out of two active consumer will get data

if partition=2 and total consumers count = 2, each of the two active consumers evenly get data

if partition=3 and total consumers count = 2, each of the two active consumers will get data. one consumer gets data from 2 partitions and other gets data from 1 partition.

if partition=3 and total consumers count = 3, each of the three active consumers evenly gets data.

Upvotes: 5

jazz
jazz

Reputation: 41

Since no offset was provided, the java client will wait for new messages but will not show existing messages - this is as expected. If one intends to read all the messages already in the topic one can use this piece of code:

if (READ_FROM_BEGINNING) {
    //consume all the messages from the topic from the beginning.
    //this doesn't work reliably if it consumer.poll(..) is not called first 
    //probably because of lazy-loading issues            
    consumer.poll(10);
    consumer.seekToBeginning(consumer.assignment()); //if intending to 
    //read from the beginning or call below to read from a predefined offset.
    //consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET);
}

Upvotes: 0

guest
guest

Reputation: 1

In the code you provided you just wait for data once for 100ms. You should receive the data in a loop or wait for longer period of time (you will only get one portion of data in this case). As for 'group.id' it the case you run consumer from console it gets random 'group.id'.

Upvotes: 0

Raz Omessi
Raz Omessi

Reputation: 1882

Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.

The group.id is a string that uniquely identifies the group of consumer processes to which this consumer belongs.

(Kafka intro)

Upvotes: 68

Related Questions