Raghav
Raghav

Reputation: 109

Kafka scala Consumer code - to print consumed records

As i am creating simple kafka consumer as below by using url :https://gist.github.com/akhil/6dfda8a04e33eff91a20 .

In that link, to print the consumed record, used a word "asScala" , that is not identified. and kindly , tell me, how to iterate the return type : ConsumerRecord[String,String] , which is poll() method's return type.

import java.util
import java.util.Properties

import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}

 
object KafkaConsumerEx extends App {

  val topic_name = "newtopic55"
  val consumer_group = "KafkaConsumerBatch"

  val prot = new Properties()
  prot.put("bootstrap.servers","localhost:9092")
  prot.put("group.id",consumer_group)
  prot.put("key.deserializer",  "org.apache.kafka.common.serialization.StringDeserializer")
  prot.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

  val kfk_consumer = new KafkaConsumer[String,String](prot)
  kfk_consumer.subscribe(util.Collections.singleton(topic_name))
  println("here")

   while(true){
    val consumer_record : ConsumerRecords[String, String]  = kfk_consumer.poll(100)
    println("records count : " + consumer_record.count())
    println("records partitions: " + consumer_record.partitions())
    consumer_record.iterator().


  }

}

Thanks in adv.

Upvotes: 2

Views: 4206

Answers (3)

user16313252
user16313252

Reputation: 11

while(true){
    val consumer_records = kfk_consumer.poll(100)
    val record_iter=consumer_record.iterator()
    while(record_iter.hasNext())
    {
       record=record_iter.next()
       println("records partitions: " + record.partition()
               "records_data:" + record.value())
    }
}
       

Upvotes: 1

Old Panda
Old Panda

Reputation: 1616

Add another answer since the scala.collection.JavaConversions was deprecated as mentioned at here.

As for this question, the code could be like

import scala.collection.JavaConverters._

for (record <- asScalaIterator(consumer_record.iterator)) {
  println(s"Here's your $record")
}

Upvotes: 1

mfirry
mfirry

Reputation: 3692

You can easily do that

for (record <- consumer_record.iterator()) {
  println(s"Here's your $record")
}

Remember to add this import:

import scala.collection.JavaConversions._

Upvotes: 6

Related Questions