Reputation: 33
I have a PySpark data Fram with 3 columns. Some rows are similar in 2 columns but not the third one, see below example.
----------------------------------------
first_name | last_name | requests_ID |
----------------------------------------
Joe | Smith |[2,3] |
----------------------------------------
Joe | Smith |[2,3,5,6] |
----------------------------------------
Jim | Bush |[9,7] |
----------------------------------------
Jim | Bush |[21] |
----------------------------------------
Sarah | Wood |[2,3] |
----------------------------------------
I want to group-by the rows based on {first_name, last_name} columns and only have the row with the maximum number of {requests_ID}. So the results should be:
----------------------------------------
first_name | last_name | requests_ID |
----------------------------------------
Joe | Smith |[2,3,5,6] |
----------------------------------------
Jim | Bush |[9,7] |
----------------------------------------
Sarah | Wood |[2,3] |
----------------------------------------
I have tries different things like the following, but it gives me a nested array of both rows in the group-by and not the longest one.
gr_df = filtered_df.groupBy("first_name", "last_name").agg(F.collect_set("requests_ID").alias("requests_ID"))
Here is the results I get:
----------------------------------------
first_name | last_name | requests_ID |
----------------------------------------
Joe | Smith |[[9,7],[2,3,5,6]]|
----------------------------------------
Jim | Bush |[[9,7],[21]] |
----------------------------------------
Sarah | Wood |[2,3] |
----------------------------------------
Upvotes: 3
Views: 2480
Reputation: 1486
To follow through with your current df that looks like this,
----------------------------------------
first_name | last_name | requests_ID |
----------------------------------------
Joe | Smith |[[9,7],[2,3,5,6]]|
----------------------------------------
Jim | Bush |[[9,7],[21]] |
----------------------------------------
Sarah | Wood |[2,3] |
----------------------------------------
try this,
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType
def myfunc(x):
temp = []
for _ in x:
temp.append(len(x))
max_ind = temp.index(max(temp))
return x[max_ind]
udf_extract = F.udf(myfunc, ArrayType(IntegerType()))
df = df.withColumn('new_requests_ID', udf_extract('requests_ID'))
#df.show()
or alternatively, without variable declaration,
import pyspark.sql.functions as F
@F.udf
def myfunc(x):
temp = []
for _ in x:
temp.append(len(x))
max_ind = temp.index(max(temp))
return x[max_ind]
df = df.withColumn('new_requests_ID', myfunc('requests_ID'))
#df.show()
Upvotes: 1
Reputation: 2655
You can use size
to determine the length of array column and the use window
like below:
Imports and create sample DataFrame
import pyspark.sql.functions as f
from pyspark.sql.window import Window
df = spark.createDataFrame([('Joe','Smith',[2,3]),
('Joe','Smith',[2,3,5,6]),
('Jim','Bush',[9,7]),
('Jim','Bush',[21]),
('Sarah','Wood',[2,3])], ('first_name','last_name','requests_ID'))
Define window to get row number of requests_ID
column in based on length of column in descending order.
Here, f.size("requests_ID")
will give length of requests_ID
column and desc()
will sort it in descending order.
w_spec = Window().partitionBy("first_name", "last_name").orderBy(f.size("requests_ID").desc())
Apply window function and get first row.
df.withColumn("rn", f.row_number().over(w_spec)).where("rn ==1").drop("rn").show()
+----------+---------+------------+
|first_name|last_name| requests_ID|
+----------+---------+------------+
| Jim| Bush| [9, 7]|
| Sarah| Wood| [2, 3]|
| Joe| Smith|[2, 3, 5, 6]|
+----------+---------+------------+
Upvotes: 2