Reputation: 2246
I did fresh installation of Apache Kafka 0.10.1.0.
I was able to send / receive messages on command prompt.
While using Producer / Consumer Java Example, I am not able to know group.id parameter on Consumer Example.
Let me know on how to fix this issue.
Below is Consumer Example I had used:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-topic");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
try {
consumer.subscribe(Arrays.asList("my-topic"));
ConsumerRecords<String, String> records = consumer.poll(100);
System.err.println("records size=>"+records.count());
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
catch (Exception ex){
ex.printStackTrace();
}
finally {
consumer.close();
}
}
After running the command for consumer, I can see the messages (on the console) posted by producer. But unable to see the messages from java program
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-topic --from-beginning
Upvotes: 49
Views: 130030
Reputation: 172
The consumer group id the consumer group which should be defined in the Kafka consumer.properties file.
Do add "my-topic" to consumer group and it should work as below:
# consumer group id
group.id=my-topic-consumer-group
Upvotes: 3
Reputation: 506
Give any random value to group id. It doesn't matter.
props.put("group.id", "Any Random Value");
Upvotes: -15
Reputation: 51
Here are some test results on partition and consumer property group.id
Properties props = new Properties();
//set all other properties as required
props.put("group.id", "ConsumerGroup1");
props.put("max.poll.records", "1");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.group id is to load balance the produced data (if the group.id is different for each consumer, each consumer will get the copy of data)
if partition=1 and total consumers count = 2, only one out of two active consumer will get data
if partition=2 and total consumers count = 2, each of the two active consumers evenly get data
if partition=3 and total consumers count = 2, each of the two active consumers will get data. one consumer gets data from 2 partitions and other gets data from 1 partition.
if partition=3 and total consumers count = 3, each of the three active consumers evenly gets data.
Upvotes: 5
Reputation: 41
Since no offset was provided, the java client will wait for new messages but will not show existing messages - this is as expected. If one intends to read all the messages already in the topic one can use this piece of code:
if (READ_FROM_BEGINNING) {
//consume all the messages from the topic from the beginning.
//this doesn't work reliably if it consumer.poll(..) is not called first
//probably because of lazy-loading issues
consumer.poll(10);
consumer.seekToBeginning(consumer.assignment()); //if intending to
//read from the beginning or call below to read from a predefined offset.
//consumer.seek(consumer.assignment().iterator().next(), READ_FROM_OFFSET);
}
Upvotes: 0
Reputation: 1
In the code you provided you just wait for data once for 100ms. You should receive the data in a loop or wait for longer period of time (you will only get one portion of data in this case). As for 'group.id' it the case you run consumer from console it gets random 'group.id'.
Upvotes: 0
Reputation: 1882
Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
If all the consumer instances have different consumer groups, then each record will be broadcast to all the consumer processes.
The group.id is a string that uniquely identifies the group of consumer processes to which this consumer belongs.
Upvotes: 68