alexanoid
alexanoid

Reputation: 25862

Apache Spark group by DF, collect values into list and then group by list

I have the following Apache Spark DataFrame (DF1):

function_name | param1 | param2 | param3 | result
---------------------------------------------------
     f1       |   a    |   b    |   c    |   1        
     f1       |   b    |   d    |   m    |   0
     f2       |   a    |   b    |   c    |   0
     f2       |   b    |   d    |   m    |   0
     f3       |   a    |   b    |   c    |   1
     f3       |   b    |   d    |   m    |   1
     f4       |   a    |   b    |   c    |   0
     f4       |   b    |   d    |   m    |   0

First of all, I'd like to group DataFrame by function_name, collect results into the ArrayType and receive the new DataFrame (DF2):

function_name | result_list
--------------------------------
     f1       |  [1,0]
     f2       |  [0,0]
     f3       |  [1,1]
     f4       |  [0,0]

rigth after that, I need to collect function_name into ArrayType by grouping result_list and I'll receive new DataFrame like the following (DF3):

result_list |  function_name_lists
------------------------------------
    [1,0]   |   [f1]
    [0,0]   |   [f2,f4]
    [1,1]   |   [f3]

So, I have a question - first of all, can I use grouping by ArrayType column in Apache Spark? If so, I can potentially have tens of millions values in result_list ArrayType single field. Will Apache Spark be able to group by result_list column in this case?

Upvotes: 2

Views: 2278

Answers (1)

murtihash
murtihash

Reputation: 8410

Yes you can do that.

Creating your data frame:

from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
list=[['f1','a','b','c',1],
     ['f1','b','d','m',0],
     ['f2','a','b','c',0],
     ['f2','b','d','m',0],
     ['f3','a','b','c',1],
     ['f3','b','d','m',1],
     ['f4','a','b','c',0],
      ['f4','b','d','m',0]]

df= spark.createDataFrame(list,['function_name','param1','param2','param3','result'])
df.show()

+-------------+------+------+------+------+
|function_name|param1|param2|param3|result|
+-------------+------+------+------+------+
|           f1|     a|     b|     c|     1|
|           f1|     b|     d|     m|     0|
|           f2|     a|     b|     c|     0|
|           f2|     b|     d|     m|     0|
|           f3|     a|     b|     c|     1|
|           f3|     b|     d|     m|     1|
|           f4|     a|     b|     c|     0|
|           f4|     b|     d|     m|     0|
+-------------+------+------+------+------+

Grouping by function_name, then grouping by result_list(using collect_list), using order of param1,param2,param3:

w=Window().partitionBy("function_name").orderBy(F.col("param1"),F.col("param2"),F.col("param3"))
w1=Window().partitionBy("function_name")
df1=df.withColumn("result_list", F.collect_list("result").over(w)).withColumn("result2",F.row_number().over(w))\
.withColumn("result3",F.max("result2").over(w1))\
.filter(F.col("result2")==F.col("result3")).drop("param1","param2","param3","result","result2","result3")


df1.groupBy("result_list")\
.agg(F.collect_list("function_name").alias("function_name_list")).show()

    +-----------+------------------+
    |result_list|function_name_list|
    +-----------+------------------+
    |     [1, 0]|              [f1]|
    |     [1, 1]|              [f3]|
    |     [0, 0]|          [f2, f4]|
    +-----------+------------------+

For doing further anaylsis, transformation or cleaning on array type columns I would recommend you check out the new higher order functions in spark2.4 and above.

(collect_list will work for spark1.6 and above)

Higher order functions in open source:

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.collect_list

https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.array_contains onwards

Databricks releases: Link:https://docs.databricks.com/delta/data-transformation/higher-order-lambda-functions.html

Upvotes: 6

Related Questions