Arthur Camara
Arthur Camara

Reputation: 577

Simulating UDAF on Pyspark for encapsulation

I'm learning Spark with PySpark, and just hit a wall when trying to make things cleaner.

Say a have a dataframe that looks like this. (of course, with way more columns and rows)

A | B |   C
--+---+------
a | 1 | 1.300
a | 2 | 2.500
a | 3 | 1.000
b | 1 | 120.0
b | 4 | 34.20
c | 2 | 3.442

and I want to run a bunch of groupby -> agg on it, using basic pyspark.sql.functions , like count() and mean(), like this:

df.groupby("A")\
    .agg(mean("B").alias("B_mean"),
         sum("C").alias("C_sum"),
         (countDistinct("B")/avg("C")).alias("New_metric"))

It works fine, runs relatively fast, and gives me the desired results.

But, eventually, slightly more complex functions will be needed, and, also, we want to make these easier to test.

How can one encapsulate these functions? Using lambda? Some way around UDFs?

I'm aware of UDAFs and that it's possible to write them in SCALA and import the code to PySpark, but, since all of our code base is already in Python, I would like to explore other options.

P.S.: We are running Spark 1.6.0

Upvotes: 3

Views: 1350

Answers (1)

user6022341
user6022341

Reputation:

Function can be defined as a combination of pyspark.sql.functions:

  • YES - go this way. For example:

    def sum_of_squares(col):
        return sum(col * col)
    
    df.select(sum_of_squares(df["foo"]])
    
    df.groupBy("foo").agg(sum_of_squares(df["bar"]])
    
  • NO - use RDD.

Upvotes: 2

Related Questions