Reputation: 139
Small question about the flow of my spark streaming program.
I have this function :
def parse(msg: String): Seq[String]
Which actually split a "good" message into multiple strings, and, if the string is "bad", returns an empty Seq.
I'm reading the messages from a kafka topic, and I want to send the results of the parsing into two different topics: If the message is "good", send the result of the parsing to the topic "good_msg_topic" If the message is "bad", send the "bad" message to the topic "bad_msg_topic"
To achieve that, I did this :
stream.foreachRDD(rdd => {
val res = rdd.map(msg => msg.value() -> parse(msg.value()))
res.foreach(pair => {
if (pair._2.isEmpty) {
producer.send(junkTopic, pair._1)
} else {
pair._2.foreach(m => producer.send(splitTopic, m))
}
})
})
However, I feel like this is not optimal. Using a Map Object to associate the original message to the result may slow down the process...
I'm beginning with Spark and Scala, so I think one could do better.
Any idea on how I could improve that ? Changing the signature of the parse function if also possible if you think it's better.
Thank you
Upvotes: 1
Views: 93
Reputation: 149518
I wouldn't be too concerned regarding performance if you haven't already measured this and found a bottleneck.
One thing I can think of which might make this code clearer is to use an ADT to describe the result type:
sealed trait Result
case class GoodResult(seq: Seq[String]) extends Result
case class BadResult(original: String) extends Result
Have parse
return Result
def parse(s: String): Result
And then use map
on DStream
instead of RDD
:
stream
.map(msg => parse(msg.value())
.foreachRDD { rdd =>
rdd.foreach { result =>
result match {
case GoodResult(seq) => seq.foreach(value => producer.send(splitTopic, value))
case BadResult(original) => producer.send(junkTopic, original)
}
}
}
Upvotes: 3