Reputation: 499
I'm trying to sort out how does reduceByKey functionate but this case is confusing me and I can't understand it at all.
The code is:
stream.foreachRDD((rdd: RDD[Record]) => {
// convert string to PoJo and generate rows as tuple group
val pairs = rdd
.map(row => (row.timestamp(), jsonDecode(row.value())))
.map(row => (row._2.getType.name(), (1, row._2.getValue, row._1)))
val flatten = pairs
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2, (y._3 + x._3) / 2))
.map(f => Row.fromSeq(Seq(f._1, f._2._2 / f._2._1, new Timestamp(f._2._3))))
Imagine data income: ["oceania", 500], ["australia", 450] and etc.
In flatten variable I'm trying to aggregate data by market type or this first type in JSON. Here is generating tuple: * the first one is counter value and this value is 1,
* second one is the rate and received from Kafka,
* third one is event time. for instance 2017-05-12 16:00:00
*
* in the map,
* method f._1
is market name,
* we divide total rate to total item count f._2._2 / f._2._1
* as you can see f._2._3
is average event time
Can someone help me explain what does mean f._2._3
(I mean I know its temp variable but what is in there or could be) and how is total rate counting by dividing f._2._2 / f._2._1
, what is dividing exactly? Thank you :)
Upvotes: 0
Views: 1610
Reputation: 5305
For each row you define the following element in your RDD pairs
:
(marketType, (counter, rate, eventTime))
Note that this is a Tuple2
whose second element is a Tuple3
. Tuple
s are special case classes whose n
-th element (starting at 1
) is named _n
. For instance, to access the rate
of an element f
, you will have to do f._2._2
(the second elemement of the Tuple3
, which is the second element of the Tuple2
).
Since your elements have special meaning, you might want to consider defining a case class MyRow(counter: Int, rate: Int, time: Timestamp)
, in order to have a clearer view on what your code is doing when you write something like f._2._3
(by the way, the type of eventTime
is not clear to me, since you have only represented it as String
, but you do numerical operations on it).
Now to what your code really attempts to do:
The reducing function takes two Tuple3
(or MyRow
, if you change your code) and outputs another one (here, your reducing function sums over the counters, the rates, and makes the average between two values on the eventTime).
reduceByKey
applies this reducing function as long as it finds two elements with the same key: since the output of the reducing function is of the same type than its inputs, it can be applied on it, as long as you have other values on your RDD that has the same key.
For a simple example, if you have
(key1, (1, 200, 2017/04/04 12:00:00))
(key1, (1, 300, 2017/04/04 12:00:00))
(key1, (1, 500, 2017/04/04 12:00:00))
(key2, (1, 500, 2017/04/04 12:00:00))
Then the reduceByKey will output
(key1, (3, 1000, 2017/04/04 12:00:00))
(key2, (1, 500, 2017/04/04 12:00:00))
And then your last map
will work on this by calculating the total rate:
(key1, (333, 2017/04/04 12:00:00))
(key2, (500, 2017/04/04 12:00:00))
You may have noticed that I used always the same time in all the examples. That's because your reducing function on this field will give unexpected results because it is not associative. Try doing the same exercise as above but with different timestamps, and you will see that the reduced value for key1
will be different depending on the order in which you apply the reduction.
Let's see this: we want to reduce 4, 8, and 16 with this function so we might want to do this as
((4 + 8) / 2 + 16) / 2
or as
(4 + (8 + 16) / 2) / 2
depending on if we want to start on the left or on the right (in a real case, there are many more different possibilities, and they will happen in Spark, since you do not always know how your values are distributed over the cluster).
Calculating the two possibilities above, we get different values: 11
and 8
, so you see that this may cause bigger problems in a real-life case.
A simple solution in your case would be to also do the sum of all timestamps (assuming they are Long
values, or even BigInteger
, to avoid overflow), and divide only in the end by the number of values to have the real time average.
Upvotes: 1