Reputation: 121
I want to test a Kafka example. I am using Kafka 0.10.0.1 The producer:
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{
val record = new ProducerRecord(topic, "key "+i," value "+i)
producer.send(record)
Thread.sleep(100)
}
}
The consumer (the topic "topicTest" is created with 1 partition):
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records:ConsumerRecords[String,String] = consumer.poll(20000)
println("records size "+records.count())
records.asScala.foreach(rec => println("offset "+rec.offset()))
}
}
the problem is that the consumer does not read from the offset 0 at the first iteration but at the other oiterations it does. I want to know the reason and how can I make the consumer reads from the offset 0 at all the iterations. The expected result is:
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
...
but the obtained result is:
records size 4
offset 2
offset 3
offset 4
offset 5
records size 6
offset 0
offset 1
offset 2
offset 3
offset 4
offset 5
...
Upvotes: 0
Views: 2489
Reputation: 901
I am unable to figure out what is exact mistake, I have written same code as yours. but for me it is working fine. if you want you can use below snippet.
import java.util
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.LongDeserializer;
import scala.collection.JavaConverters._
import java.util.Properties
object ConsumerExample extends App {
val TOPIC = "test-stack"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("group.id", "testinf")
props.put("auto.offset.reset", "earliest")
props.put("auto.offset.reset.config", "false")
var listener = new ConsumerRebalanceListener() {
override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = {
println("Assignment : " + partitions)
}
override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = {
// do nothing
}
}
val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(util.Collections.singletonList(TOPIC), listener)
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records = consumer.poll(20000)
// for (record <- records.asScala) {
// println(record)
// }
println("records size "+records.count())
records.asScala.foreach(rec => println("offset "+rec.offset()))
}
}
Try it out and let me know. if you have any issues.
Upvotes: 1