Reputation:
I am first time using kafka with scala. I am trying to consume data from kafka topic which is already sent by producer. My code is as follows:
consumer.poll(5000).asScala.toList
I already had 49 records in my kafka topic. When I used this, It brought random records. When I printed offset and timestamp it started printing offset from 0 to 21. But when I check kafka topic 0 offset timestamp is different. Neither they were last 21 records. Didn't checked all the record but checked some from last and top. Means it didn't picked actual 0 offset data. Why it is behaving like that?
My consumer configs are as follows:
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("group.id", "consumer-group")
Secondly, is it possible I can get all the data which is already present from beginning? how can I get it?
Upvotes: 0
Views: 1429
Reputation: 9357
When I printed offset and timestamp it started printing offset from 0 to 21. But when I check kafka topic 0 offset timestamp is different
Offsets are on per-partition basis and not per-topic. and your topic might have multiple partitions.
You could have checked the timestamp of 0th
offset of partition 0
and that of the 0th
offset in partition 1
, which could be different.
Neither they were last 21 records
The ordering in Kafka is guaranteed per partition i.e. all messages belonging to a topic partition will be in order, however, the order is not guaranteed across multiple partitions of the same topic.
Secondly, is it possible I can get all the data which is already present from beginning? how can I get it?
Yes, it is possible to get all the data from beginning. Check KafkaConsumer#seekToBeginning() method. An example would be like..
consumer.subscribe(Collections.singletonList("topic"));
consumer.seekToBeginning(consumer.assignment());
But what I read in official doc is poll consumes data from last offset only. Then how changing group will fetch data from beginning?
The poll, by default will be from last offset (auto.offset.reset
), however, you can change this config to read from earliest.
Remember that earliest means beginning, only if there is no initial consumer offset, not otherwise. If there is a committed offset for that group, then the next message that is read is the one that is after the committed message.
Also, subscriptions are used only when you need to keep of track of what is consumed so that you don't consume them again.
If you would want to always read from beginning, then there is no need for subscribing to the topic. You could rather assign the topics and seek to beginning. Doing so will not retain any consumer groups and their offset tracking.
consumer.assign(consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())).collect(Collectors.toSet()));
consumer.seekToBeginning(consumer.assignment());
Upvotes: 2