Reputation: 873
I have this data frame.
uid status count amount
14 success 1 50
14 failed 2 60
This is the output I want
uid result
14 {"successful_count": 1, "successful_amount": 50, "failure_count": 2,
"failure_amount": 60}
I have tried this so far.
schema = StructType([ \
StructField("successful_amount",DoubleType(),True), \
StructField("successful_count",IntegerType(),True), \
StructField("failure_amount",DoubleType(),True), \
df_amount.withColumn("res", when(col("transaction_status") == "success", to_json(struct(col("amount").alias("successful_amount"), col("count").alias("successful_count"))))\
.when(col("transaction_status") == "failed", to_json(struct(col("amount").alias("failed_amount"), col("count").alias("failed_count")))))\
.withColumn("res", from_json(col("per_method"), schema))
This has added one more struct column 'res' but now I am not sure how to groupby and combine two struct/JSON into one object. Do I need to write a custom aggregate function? Is there any easier/better way to get the output?
Upvotes: 0
Views: 915
Reputation: 573
Option 1:
This approach uses an aggregate function to group them together (considering their uid), and an udf function to get the dictionary you are looking for:
from pyspark.sql.functions import udf, collect_list
# First, group the pertinent columns depending on the uid
grouped_df = df_amount.groupby('uid').agg(collect_list('status').alias("name"), collect_list('count').alias("count"), collect_list('amount').alias("amount"))
# resulting on a pyspark dataframe with each possible value in the same row
This is the resulting pyspark dataframe:
|uid| name| count| amount|
| 14|[success, failed]|[1, 2]|[50, 60]|
After that, create the column containing the dictionary you are looking for:
# Create our customize function to create the dictionary using the values of each column (list)
def new_column(name, count, amount):
nr = dict()
for i in range(len(name)):
nr[name[i] + "_amount"] = amount[i]
nr[name[i] + "_count"] = count[i]
return nr
result = grouped_df.withColumn("result", new_column(grouped_df['name'], grouped_df['count'], grouped_df['amount'])).select("uid", "result") = False)
The resulting data frame:
|uid|result |
|14 |{success_count=1, failed_count=2, success_amount=50, failed_amount=60}|
Option 2:
You could also create a dataframe joining the rows with the same uid and different status. After that, create a new pyspark dataframe with res column adapting it to the schema you are looking for:
# Filter the dataframe for the different values of the column status and join them together to get all the different values in the same row
# You can also rename the columns if necessary
df_amount_joined = df_amount.filter(col("status") == "success").withColumnRenamed("count", "successful_count").withColumnRenamed("amount", "successful_amount").join(df_amount.filter(col("status") == "failed").withColumnRenamed("count", "failure_count").withColumnRenamed("amount", "failure_amount"), on = "uid", how= "left").drop("status")
Getting the resulting dataframe:
| 14| 1| 50| 2| 60|
Create the last column using struct as you did in your example:
# Finally create the column as you did in your example
df_final = df_amount_joined.withColumn("res", to_json(struct(col("successful_count"), col("successful_amount"), col("failure_count"), col("failure_amount")))).select("uid", "res") = False)
That will leave you with the dataframe you were looking for:
|uid|res |
|14 |{"successful_count":1,"successful_amount":50,"failure_count":2,"failure_amount":60}|
Upvotes: 2