user1228785
user1228785

Reputation: 542

Spark 3.0 Sort and apply on group Scala/Java

I have spark DataSet lets see with columsn A,B,C

I want to take dataset

In Flink

dataset.groupBy(0).sortGroup(1, Order.ASCENDING)
                .reduceGroup({})

In Pyspark

We can call apply function with Pandas on group and sort in pandas However its very slow almos 10x compared to Flink

Note: I want to do processing over grouped data and return another dataset not standard aggregate

Can someone point me to similar code about how to do in java/scala in Spark ?

Upvotes: 1

Views: 264

Answers (1)

ollik1
ollik1

Reputation: 4540

A couple of possible approaches depending on the iteration logic:

Using Dataset API

Given

val df =
      Seq(("a", 0, "foo"), ("b", 1, "foo"), ("a", 1, "foobar"))
        .toDF("A", "B", "C")

first preprocess it a bit

df.select($"A", struct($"B", $"C") as $"S").show()

to get

+---+-----------+
|  A|          S|
+---+-----------+
|  a|   [0, foo]|
|  b|   [1, foo]|
|  a|[1, foobar]|
+---+-----------+

Now we can apply any Scala code to the sequence of tuples S including sorting:

df.select($"A", struct($"B", $"C") as $"S")
      .groupBy("A")
      .agg(collect_list("S"))
      .as[(String, Seq[(Int, String)])]
      .map {
        case (a, l) => (a, l.sortBy(_._1).map(_._2).maxBy(_.length))
      }
      .show()

Using UDAFs

Implement a custom UDAF:

class MyAgg extends Aggregator[
      (Int, String),
      mutable.ListBuffer[(Int, String)],
      /* any output type here */] {
...

and aggregate using that:

val myagg = udaf(new MyAgg())
df.select($"A", struct($"B", $"C") as "S").groupBy($"A").agg(myagg($"S"))

Upvotes: 2

Related Questions