theMadKing
theMadKing

Reputation: 2074

Spark Streaming GroupBy Parts of a Tuple to Process

I am running a spark streaming job that is running off of Kafka. I get messages in like this:

val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => {
  (mmd.topic, mmd.partition, mmd.offset, mmd.message)
})

Now as I bring data in I want to group by the topic and partition so everything with the same topic/partition I can process in one batch. What is the right function to use here

messageStream.foreachRDD(x => x.?

Is it a groupBy? and if it is a groupBy how to I group by the first 2 parts of the tuple I have in. The KafkaRDD[0] will have many messages in it so I want to group them into sets of like messages than be able to process each grouping as a chunk vs individual messages.

Edit: So based on the below feed back so I would have something like this:

   messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
      ?
    }))

Is it now in a K,V like K is (topic, partition), value is (offset, topic)? I need the 1st and 2nd part of the tuple because that will allow me to make an API call to get instructions on what to do with the message. What I dont want to do is individually call the API on each message because a lot of them have the same instruction set based on the topic/partition.

Edit: Realized that it comes now as:

K:(Topic, Partition) V: CompactBuffer((Topic, Partition, Offset, Message), ()) etc.

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => {
          val topic = x._1_.1
          val partition = x._1._2
          x._2.forEach(x=> ... 
        }))

Upvotes: 0

Views: 145

Answers (1)

ryan
ryan

Reputation: 1084

to groupBy the first two parts in the tuple, you can try the following:

messageStream groupBy (x => (x._1, x._2))

Upvotes: 1

Related Questions