DaliMidou
DaliMidou

Reputation: 121

KafkaConsumer does not read all records from topic

I want to test a kafka example, the producer:

object ProducerApp extends App {

  val topic = "topicTest"
  val  props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  val producer = new KafkaProducer[String, String](props)
  for(i <- 0 to 125000)
  {    
    val record = new ProducerRecord(topic, "key "+i,new PMessage())    
    producer.send(record)       
  }
}

The consumer:

object ConsumerApp extends App {
  val topic = "topicTest"  
  val properties = new Properties
  properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")  
  properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")  
  properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
  properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  val consumer = new KafkaConsumer[String, String](properties)  
  consumer.subscribe(scala.List(topic).asJava)    
  while (true) {
    consumer.seekToBeginning(consumer.assignment())
    val records:ConsumerRecords[String,String] = consumer.poll(20000)
    println("records size "+records.count())    
  }  
}

The topic "topicTest" is created with 1 partition.

The expected result is:

...
records size 125000
records size 125000
records size 125000
records size 125000
...

but the obtained result is:

...
records size 778
records size 778
records size 778
records size 778
...

The consumer does not read all the records from the topic. I want to understand the reason. However, if the number of records is smaller (20 for example), it works fine and the consumer reads all the records. Is the size of the topic limited? Is there a modification in the configuration of Kafka to allow the process of a big number of records?

Upvotes: 0

Views: 1184

Answers (1)

ppatierno
ppatierno

Reputation: 10075

There is the max.poll.records consumer parameters which has 500 as default with Kafka 1.0.0 so you can't have the result you want with 125000. For this reason it works with 20 but it's strange the result 778 you have.

Upvotes: 1

Related Questions