Sparker0i
Sparker0i

Reputation: 1851

Conditional Reduce on DStream

I have a DStream[RawWeatherData] object called parsedWeatherStream . Each object in RawWeatherData class will look like:

("725030:14732",2008,1,1,1,5.0,-3.3,1020.6,290,4.1,2,0.0,0.2,0.0)

Parameters of above object: (wsid, year, month, day, hour, temperature, dewpoint, pressure, windDirection, windSpeed, skyCondition, oneHourPrecip, sixHourPrecip)

Objects like this is what I will be getting into the Spark Streaming Context from Kafka.

My end goal is to add up the all the oneHourPrecip values for all objects in the DStream (highlighted in the object with bold on the right side), with a condition checking whether the bold values on the left (wsid, year, month, day) ,when comparing two objects in the stream, are same or not.

For this, I was able to do a map from the parsedStream, taking out only the objects I needed:

val newStream = parsedWeatherStream.map { weather =>
    (weather.wsid, weather.year, weather.month, weather.day, weather.oneHourPrecip)
}

Now I'm not sure what technique should I be using to sum up all the oneHourPrecip values. I have tried to conditionally do a reduce, but that ends up in an error:

// ERROR: Type mismatch
val transformedStream = newStream.reduce{(a , b) => {
    if (a._1 == b._1 && a._2 == b._2 && a._3 == b._3 && a._4 == b._4)
        (a._1 , a._2 , a._3 , a._4 , a._5 + b._5)
    else
        None //Would like to do Nothing here, so returning None
}}

I have also looked into the transform operation, but that also doesn't seem to help my cause.

Upvotes: 0

Views: 43

Answers (1)

Sparker0i
Sparker0i

Reputation: 1851

For the newStream object in my question above, I made a small, but subtle change. I added my keys (those 4 values) inside a parantheses, to represent my newStream as a key-value pair. Then all I had to do was a reduceByKey on it.

val newStream = parsedWeatherStream.map { weather =>
    ((weather.wsid, weather.year, weather.month, weather.day), weather.oneHourPrecip)
}.reduceByKey{(a , b) =>
    a + b
}

Upvotes: 1

Related Questions