Reputation: 121
I want to test a kafka example, the producer:
object ProducerApp extends App {
val topic = "topicTest"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 125000)
{
val record = new ProducerRecord(topic, "key "+i,new PMessage())
producer.send(record)
}
}
The consumer:
object ConsumerApp extends App {
val topic = "topicTest"
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(scala.List(topic).asJava)
while (true) {
consumer.seekToBeginning(consumer.assignment())
val records:ConsumerRecords[String,String] = consumer.poll(20000)
println("records size "+records.count())
}
}
The topic "topicTest" is created with 1 partition.
The expected result is:
...
records size 125000
records size 125000
records size 125000
records size 125000
...
but the obtained result is:
...
records size 778
records size 778
records size 778
records size 778
...
The consumer does not read all the records from the topic. I want to understand the reason. However, if the number of records is smaller (20 for example), it works fine and the consumer reads all the records. Is the size of the topic limited? Is there a modification in the configuration of Kafka to allow the process of a big number of records?
Upvotes: 0
Views: 1184
Reputation: 10075
There is the max.poll.records
consumer parameters which has 500 as default with Kafka 1.0.0 so you can't have the result you want with 125000.
For this reason it works with 20 but it's strange the result 778 you have.
Upvotes: 1