Reputation: 2074
I am running a spark streaming job that is running off of Kafka. I get messages in like this:
val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
(mmd.topic, mmd.partition, mmd.offset, mmd.message)
})
Now as I bring data in I want to group by the topic and partition so everything with the same topic/partition I can process in one batch. What is the right function to use here
messageStream.foreachRDD(x => x.?
Is it a groupBy? and if it is a groupBy how to I group by the first 2 parts of the tuple I have in. The KafkaRDD[0] will have many messages in it so I want to group them into sets of like messages than be able to process each grouping as a chunk vs individual messages.
Edit: So based on the below feed back so I would have something like this:
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
?
}))
Is it now in a K,V like K is (topic, partition), value is (offset, topic)? I need the 1st and 2nd part of the tuple because that will allow me to make an API call to get instructions on what to do with the message. What I dont want to do is individually call the API on each message because a lot of them have the same instruction set based on the topic/partition.
Edit: Realized that it comes now as:
K:(Topic, Partition) V: CompactBuffer((Topic, Partition, Offset, Message), ()) etc.
messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
val topic = x._1_.1
val partition = x._1._2
x._2.forEach(x=> ...
}))
Upvotes: 0
Views: 145
Reputation: 1084
to groupBy the first two parts in the tuple, you can try the following:
messageStream groupBy (x => (x._1, x._2))
Upvotes: 1