Reputation: 1851
I have a DStream[RawWeatherData]
object called parsedWeatherStream
. Each object in RawWeatherData
class will look like:
("725030:14732",2008,1,1,1,5.0,-3.3,1020.6,290,4.1,2,0.0,0.2,0.0)
Parameters of above object: (wsid, year, month, day, hour, temperature, dewpoint, pressure, windDirection, windSpeed, skyCondition, oneHourPrecip, sixHourPrecip)
Objects like this is what I will be getting into the Spark Streaming Context from Kafka.
My end goal is to add up the all the oneHourPrecip
values for all objects in the DStream
(highlighted in the object with bold on the right side), with a condition checking whether the bold values on the left (wsid, year, month, day)
,when comparing two objects in the stream, are same or not.
For this, I was able to do a map from the parsedStream, taking out only the objects I needed:
val newStream = parsedWeatherStream.map { weather =>
(weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
}
Now I'm not sure what technique should I be using to sum up all the oneHourPrecip
values. I have tried to conditionally do a reduce, but that ends up in an error:
// ERROR: Type mismatch
val transformedStream = newStream.reduce{(a , b) => {
if (a._1 == b._1 && a._2 == b._2 && a._3 == b._3 && a._4 == b._4)
(a._1 , a._2 , a._3 , a._4 , a._5 + b._5)
else
None //Would like to do Nothing here, so returning None
}}
I have also looked into the transform
operation, but that also doesn't seem to help my cause.
Upvotes: 0
Views: 43
Reputation: 1851
For the newStream
object in my question above, I made a small, but subtle change. I added my keys (those 4 values) inside a parantheses, to represent my newStream
as a key-value pair. Then all I had to do was a reduceByKey
on it.
val newStream = parsedWeatherStream.map { weather =>
((weather.wsid, weather.year, weather.month, weather.day), weather.oneHourPrecip)
}.reduceByKey{(a , b) =>
a + b
}
Upvotes: 1