Reputation: 63
I have a text file which I read and then split using the split
operation. This results in an RDD with Array(A, B, C, D, E, F, G, H, I)
.
I would like to find max(F) - min(G)
for every key E
(reduce by key E
). Then I want to combine the resulting values by key C
and concatenate this sum result for every row with the same key.
For example:
+--+--+--+--+
| C| E| F| G|
+--+--+--+--+
|en| 1| 3| 1|
|en| 1| 4| 0|
|nl| 2| 1| 1|
|nl| 2| 5| 2|
|nl| 3| 9| 3|
|nl| 3| 6| 4|
|en| 4| 9| 1|
|en| 4| 2| 1|
+-----------+
Should result in
+--+--+-------------+---+
| C| E|max(F)-min(G)|sum|
+--+--+-------------+---+
|en| 1| 4 |12 |
|nl| 2| 4 |10 |
|nl| 3| 6 |10 |
|en| 4| 8 |12 |
+--+--+-------------+---+
What would be the best way to tackle this? Currently I am trying to perform the max(F)-min(G)
by running
val maxCounts = logEntries.map(line => (line(4), line(5).toLong)).reduceByKey((x, y) => math.max(x, y))
val minCounts = logEntries.map(line => (line(4), line(6).toLong)).reduceByKey((x, y) => math.min(x, y))
val maxMinCounts = maxCounts.join(minCounts).map{ case(id, maxmin) => (id, (maxmin._1 - maxmin._2)) }
And then join
the resulting RDDs. However, this becomes tricky when I also want to sum these values and append them to my existing data set.
I would love to hear any suggestions!
Upvotes: 1
Views: 1549
Reputation: 1538
This kind of logic is easily implemented in the dataframe API (also). But you need to explicitly form your columns from the array:
val window = Window.partitionBy('C)
val df = rdd
.map { case Array(_, _, c, _, e, f, g, _, _) => (c,e,f,g) }
.toDF("C","E","F","G")
.groupBy('C,'E)
.agg((max('F) - min('G)).as("diff"))
.withColumn("sum",sum('diff).over(window))
Upvotes: 2
Reputation: 1586
assuming, like your sample data, that unique E's never span multiple C's... you could do something like this.
import math.{max,min}
case class FG(f: Int, g: Int) {
def combine(that: FG) =
FG(max(f, that.f), min(g, that.g))
def result = f - g
}
val result = {
rdd
.map{ case Array(_, _, c, _, e, f, g, _, _) =>
((c, e), FG(f, g)) }
.reduceByKey(_ combine _)
.map{ case ((c, _), fg) =>
(c, fg.result) }
.reduceByKey(_+_)
}
Upvotes: 1