Reputation: 436
I am consuming and processing messages in the Kafka consumer application using Spark in Scala. Sometimes it takes little more time than usual to process messages from Kafka message queue. At that time I need to consume latest message, ignoring the earlier ones which have been published by the producer and yet to be consumed.
Here is my consumer code:
object KafkaSparkConsumer extends MessageProcessor {
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(1))
val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"),
"zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"auto.offset.reset" -> properties.getProperty("offset_reset"),
"zookeeper.session.timeout" -> properties.getProperty("zookeeper_timeout"))
val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaParams,
Map("moved_object" -> 1),
StorageLevel.MEMORY_ONLY_SER
).map(_._2)
msgStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
}
Is there any way to make sure the consumer always gets the most recent message in the consumer application? Or do I need to set any property in Kafka configuration to achieve the same?
Any help on this would be greatly appreciated. Thank you
Upvotes: 1
Views: 3689
Reputation: 3250
Yes, you can set staringOffset to latest to consume latest messages.
val spark = SparkSession
.builder
.appName("kafka-reading")
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "latest")
.option("subscribe", topicName)
.load()
Upvotes: 0
Reputation: 1218
You can leverage two KafkaConsumer APIs to get the very last message from a partition (assuming log compaction won't be an issue):
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
: This gives you the end offset of the given partitions. Note that the end offset is the offset of the next message to be delivered.public void seek(TopicPartition partition, long offset)
: Run this for each partition and provide its end offset from above call minus 1 (assuming it's greater than 0).Upvotes: 1
Reputation: 4532
Kafka consumer api include method
void seekToEnd(Collection<TopicPartition> partitions)
So, you can get assigned partitions from consumer and seek for all of them to the end. There is similar method to seekToBeginning.
Upvotes: 2
Reputation: 483
You can always generate a new (random) group id when connecting to Kafka - that way you will start consuming new messages when you connect.
Upvotes: 0