Reputation: 626
I have a table like below:
transaction_id | transaction_date | partition_key | amount | record_id | record_in_date |
---|---|---|---|---|---|
1 | 2021-09-21 | 1 | 1 | 1 | 2021-09-20 |
1 | 2021-09-21 | 1 | 1 | 2 | 2021-09-20 |
1 | 2021-09-21 | 2 | 1 | 3 | 2021-09-20 |
2 | 2021-09-21 | 1 | 1 | 4 | 2021-09-20 |
2 | 2021-09-21 | 1 | 1 | 5 | 2021-09-20 |
3 | 2021-09-21 | 2 | 1 | 6 | 2021-09-20 |
How can I transform the table above into the schema below:
root
|-- transaction_id: string (nullable = false)
|-- transaction_date: string (nullable = false)
|-- transaction_partition: struct (nullable = false)
| |-- partition_key: integer (nullable = false)
| |-- record_amount_sum: integer (nullable = false)
| |-- records: struct (nullable = false)
| | |-- record_id: string (nullable = false)
| | |-- record_amount: integer (nullable = false)
| | |-- record_in_date: string (nullable = false)
like this:
transaction_id | transaction |
---|---|
1 | { transaction_id: 1, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 1, record_amount_sum: 2, records: [ { record_id: 1, record_in_date: 2021-09-20 }, { record_id: 2, record_in_date: 2021-09-20 } ] }, { partition_key: 2, record_amount_sum: 1, records: [ { record_id: 3, record_in_date: 2021-09-20 } ] } ] } |
2 | { transaction_id: 2, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 1, record_amount_sum: 2, records: [ { record_id: 4, record_in_date: 2021-09-20 }, { record_id: 5, record_in_date: 2021-09-20 } ] } ] } |
3 | { transaction_id: 3, transaction_date: 2021-09-21, transaction_partition: [ { partition_key: 2, record_amount_sum: 1, records: [ { record_id: 5, record_in_date: 2021-09-20 } ] } ] } |
Upvotes: 0
Views: 916
Reputation: 14905
You can first execute the inner aggregation and then aggreate the result once again:
from pyspark.sql import functions as F
df = ...
df1=df.groupBy("transaction_id", "transaction_date", "partition_key") \
.agg(F.sum("amount").alias("record_amount_sum"),
F.collect_list(F.struct("record_id", "amount", "record_in_date")).alias("records")) \
.groupBy("transaction_id", "transaction_date") \
.agg(F.collect_list(
F.struct("partition_key", "record_amount_sum", "records")).alias("transaction_partition"))
df1.orderBy("transaction_id").toJSON().collect()
Upvotes: 1
Reputation: 10035
You may try the following spark sql query
SELECT
transaction_id,
transaction_date,
collect_list(
STRUCT(
partition_key,
record_amount_sum,
records
)
) as transaction_partition
FROM (
SELECT
transaction_id,
transaction_date,
partition_key,
SUM(amount) as record_amount_sum,
collect_list(
STRUCT(
record_id,
amount as record_amount,
record_in_date
)
) as records
FROM
my_temp_view
GROUP BY
transaction_id,
transaction_date,
partition_key
) t
GROUP BY
transaction_id,
transaction_date
Let me know if this works for you.
Upvotes: 1