Reputation: 25862
I have the following Apache Spark DataFrame (DF1):
function_name | param1 | param2 | param3 | result
---------------------------------------------------
f1 | a | b | c | 1
f1 | b | d | m | 0
f2 | a | b | c | 0
f2 | b | d | m | 0
f3 | a | b | c | 1
f3 | b | d | m | 1
f4 | a | b | c | 0
f4 | b | d | m | 0
First of all, I'd like to group DataFrame by function_name
, collect results into the ArrayType
and receive the new DataFrame (DF2):
function_name | result_list
--------------------------------
f1 | [1,0]
f2 | [0,0]
f3 | [1,1]
f4 | [0,0]
rigth after that, I need to collect function_name
into ArrayType
by grouping result_list
and I'll receive new DataFrame like the following (DF3):
result_list | function_name_lists
------------------------------------
[1,0] | [f1]
[0,0] | [f2,f4]
[1,1] | [f3]
So, I have a question - first of all, can I use grouping by ArrayType column in Apache Spark? If so, I can potentially have tens of millions values in result_list
ArrayType single field. Will Apache Spark be able to group by result_list
column in this case?
Upvotes: 2
Views: 2278
Reputation: 8410
Yes you can do that.
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
list=[['f1','a','b','c',1],
['f1','b','d','m',0],
['f2','a','b','c',0],
['f2','b','d','m',0],
['f3','a','b','c',1],
['f3','b','d','m',1],
['f4','a','b','c',0],
['f4','b','d','m',0]]
df= spark.createDataFrame(list,['function_name','param1','param2','param3','result'])
df.show()
+-------------+------+------+------+------+
|function_name|param1|param2|param3|result|
+-------------+------+------+------+------+
| f1| a| b| c| 1|
| f1| b| d| m| 0|
| f2| a| b| c| 0|
| f2| b| d| m| 0|
| f3| a| b| c| 1|
| f3| b| d| m| 1|
| f4| a| b| c| 0|
| f4| b| d| m| 0|
+-------------+------+------+------+------+
w=Window().partitionBy("function_name").orderBy(F.col("param1"),F.col("param2"),F.col("param3"))
w1=Window().partitionBy("function_name")
df1=df.withColumn("result_list", F.collect_list("result").over(w)).withColumn("result2",F.row_number().over(w))\
.withColumn("result3",F.max("result2").over(w1))\
.filter(F.col("result2")==F.col("result3")).drop("param1","param2","param3","result","result2","result3")
df1.groupBy("result_list")\
.agg(F.collect_list("function_name").alias("function_name_list")).show()
+-----------+------------------+
|result_list|function_name_list|
+-----------+------------------+
| [1, 0]| [f1]|
| [1, 1]| [f3]|
| [0, 0]| [f2, f4]|
+-----------+------------------+
For doing further anaylsis, transformation or cleaning on array type columns I would recommend you check out the new higher order functions in spark2.4 and above.
(collect_list will work for spark1.6 and above)
Higher order functions in open source:
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.collect_list
Databricks releases: Link:https://docs.databricks.com/delta/data-transformation/higher-order-lambda-functions.html
Upvotes: 6