Reputation: 542
I have spark DataSet lets see with columsn A,B,C
I want to take dataset
dataset.groupBy(0).sortGroup(1, Order.ASCENDING)
.reduceGroup({})
We can call apply function with Pandas on group and sort in pandas However its very slow almos 10x compared to Flink
Note: I want to do processing over grouped data and return another dataset not standard aggregate
Can someone point me to similar code about how to do in java/scala in Spark ?
Upvotes: 1
Views: 264
Reputation: 4540
A couple of possible approaches depending on the iteration logic:
Given
val df =
Seq(("a", 0, "foo"), ("b", 1, "foo"), ("a", 1, "foobar"))
.toDF("A", "B", "C")
first preprocess it a bit
df.select($"A", struct($"B", $"C") as $"S").show()
to get
+---+-----------+
| A| S|
+---+-----------+
| a| [0, foo]|
| b| [1, foo]|
| a|[1, foobar]|
+---+-----------+
Now we can apply any Scala code to the sequence of tuples S including sorting:
df.select($"A", struct($"B", $"C") as $"S")
.groupBy("A")
.agg(collect_list("S"))
.as[(String, Seq[(Int, String)])]
.map {
case (a, l) => (a, l.sortBy(_._1).map(_._2).maxBy(_.length))
}
.show()
Implement a custom UDAF:
class MyAgg extends Aggregator[
(Int, String),
mutable.ListBuffer[(Int, String)],
/* any output type here */] {
...
and aggregate using that:
val myagg = udaf(new MyAgg())
df.select($"A", struct($"B", $"C") as "S").groupBy($"A").agg(myagg($"S"))
Upvotes: 2