user1888955
user1888955

Reputation: 626

Nested groupBy and agg in Spark

I have a table like below:

transaction_id transaction_date partition_key amount record_id record_in_date
1 2021-09-21 1 1 1 2021-09-20
1 2021-09-21 1 1 2 2021-09-20
1 2021-09-21 2 1 3 2021-09-20
2 2021-09-21 1 1 4 2021-09-20
2 2021-09-21 1 1 5 2021-09-20
3 2021-09-21 2 1 6 2021-09-20

How can I transform the table above into the schema below:

root
 |-- transaction_id: string (nullable = false)
 |-- transaction_date: string (nullable = false)
 |-- transaction_partition: struct (nullable = false)
 |    |-- partition_key: integer (nullable = false)
 |    |-- record_amount_sum: integer (nullable = false)
 |    |-- records: struct (nullable = false)
 |    |    |-- record_id: string (nullable = false)
 |    |    |-- record_amount: integer (nullable = false)
 |    |    |-- record_in_date: string (nullable = false)

like this:

transaction_id transaction
1 { transaction_id: 1, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 1, record_amount_sum: 2, records: [ { record_id: 1, record_in_date: 2021-09-20 }, { record_id: 2, record_in_date: 2021-09-20 } ] }, { partition_key: 2, record_amount_sum: 1, records: [ { record_id: 3, record_in_date: 2021-09-20 } ] } ] }
2 { transaction_id: 2, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 1, record_amount_sum: 2, records: [ { record_id: 4, record_in_date: 2021-09-20 }, { record_id: 5, record_in_date: 2021-09-20 } ] } ] }
3 { transaction_id: 3, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 2, record_amount_sum: 1, records: [ { record_id: 5, record_in_date: 2021-09-20 } ] } ] }

Upvotes: 0

Views: 916

Answers (2)

werner
werner

Reputation: 14905

You can first execute the inner aggregation and then aggreate the result once again:

from pyspark.sql import functions as F

df = ...
df1=df.groupBy("transaction_id", "transaction_date", "partition_key") \
    .agg(F.sum("amount").alias("record_amount_sum"), 
        F.collect_list(F.struct("record_id", "amount", "record_in_date")).alias("records")) \
    .groupBy("transaction_id", "transaction_date") \
    .agg(F.collect_list(
        F.struct("partition_key", "record_amount_sum", "records")).alias("transaction_partition"))

df1.orderBy("transaction_id").toJSON().collect()

Upvotes: 1

ggordon
ggordon

Reputation: 10035

You may try the following spark sql query

SELECT
    transaction_id,
    transaction_date,
    collect_list(
        STRUCT(
            partition_key,
            record_amount_sum,
            records
        ) 
    ) as transaction_partition
FROM (
    SELECT
        transaction_id,
        transaction_date,
        partition_key,
        SUM(amount) as record_amount_sum,
        collect_list(
            STRUCT(
                record_id,
                amount as record_amount,
                record_in_date
            )
        ) as records
    FROM
        my_temp_view
    GROUP BY 
        transaction_id,
        transaction_date,
        partition_key
) t
GROUP BY
    transaction_id,
    transaction_date

Let me know if this works for you.

Upvotes: 1

Related Questions