Arjun A J
Arjun A J

Reputation: 436

Apache Kafka: How to receive latest message from Kafka?

I am consuming and processing messages in the Kafka consumer application using Spark in Scala. Sometimes it takes little more time than usual to process messages from Kafka message queue. At that time I need to consume latest message, ignoring the earlier ones which have been published by the producer and yet to be consumed.

Here is my consumer code:

object KafkaSparkConsumer extends MessageProcessor {

def main(args: scala.Array[String]): Unit = {
  val properties = readProperties()

  val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
  val ssc = new StreamingContext(streamConf, Seconds(1))

  val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
  val kafkaParams = Map("metadata.broker.list"         ->  properties.getProperty("broker_connection_str"), 
                      "zookeeper.connect"              ->  properties.getProperty("zookeeper_connection_str"), 
                      "group.id"                       ->  group_id, 
                      "auto.offset.reset"              ->  properties.getProperty("offset_reset"),
                      "zookeeper.session.timeout"      ->  properties.getProperty("zookeeper_timeout"))

  val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
                      ssc,
                      kafkaParams,
                      Map("moved_object" -> 1),
                      StorageLevel.MEMORY_ONLY_SER
                      ).map(_._2)

  msgStream.foreachRDD { x =>  
    x.foreach {  
      msg => println("Message: "+msg)
      processMessage(msg)
    }    
  }                         
  ssc.start()
  ssc.awaitTermination()
  }  
}

Is there any way to make sure the consumer always gets the most recent message in the consumer application? Or do I need to set any property in Kafka configuration to achieve the same?

Any help on this would be greatly appreciated. Thank you

Upvotes: 1

Views: 3689

Answers (4)

Mahesh Chand
Mahesh Chand

Reputation: 3250

Yes, you can set staringOffset to latest to consume latest messages.

val spark = SparkSession
  .builder
  .appName("kafka-reading")
  .getOrCreate()

import spark.implicits._
 val df = spark
         .readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", "localhost:9092")
         .option("startingOffsets", "latest")
         .option("subscribe", topicName)
         .load()

Upvotes: 0

vahid
vahid

Reputation: 1218

You can leverage two KafkaConsumer APIs to get the very last message from a partition (assuming log compaction won't be an issue):

  1. public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions): This gives you the end offset of the given partitions. Note that the end offset is the offset of the next message to be delivered.
  2. public void seek(TopicPartition partition, long offset): Run this for each partition and provide its end offset from above call minus 1 (assuming it's greater than 0).

Upvotes: 1

Natalia
Natalia

Reputation: 4532

Kafka consumer api include method

void seekToEnd(Collection<TopicPartition> partitions)

So, you can get assigned partitions from consumer and seek for all of them to the end. There is similar method to seekToBeginning.

Upvotes: 2

Honza Z&#237;ka
Honza Z&#237;ka

Reputation: 483

You can always generate a new (random) group id when connecting to Kafka - that way you will start consuming new messages when you connect.

Upvotes: 0

Related Questions