naifmeh
naifmeh

Reputation: 408

Spark grouped map UDF in Scala

I am trying to write some code that would allow me to compute some action on a group of rows of a dataframe. In PySpark, this is possible by defining a Pandas UDF of type GROUPED_MAP. However, in Scala, I only found a way to create custom aggregators (UDAFs) or classic UDFs.

My temporary solution is to generate a list of keys that would encode my groups which would allow me to filter the dataframe and perform my action for each subset of dataframe. However, this approach is not optimal and very slow. The performed actions are made sequentially, thus taking a lot of time. I could parallelize the loop but I'm sure this would show any improvement since Spark is already distributed.

Is there any better way to do what I want ?

Edit: Tried parallelizing using Futures but there was no speed improvement, as expected

Upvotes: 3

Views: 686

Answers (1)

RvdV
RvdV

Reputation: 466

To the best of my knowledge, this is something that's not possible in Scala. Depending on what you want, I think there could be other ways of applying a transformation to a group of rows in Spark / Scala:

  1. Do a groupBy(...).agg(collect_list(<column_names>)), and use a UDF that operates on the array of values. If desired, you can use a select statement with explode(<array_column>) to revert to the original format
  2. Try rewriting what you want to achieve using window functions. You can add a new column with an aggregate expression like so:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy('group)

val result = spark.range(100)
    .withColumn("group", pmod('id, lit(3)))
    .withColumn("group_sum", sum('id).over(w))

Upvotes: 1

Related Questions