Reputation:
I have the below data frame and I have grouped the below data frame by id
, txnId
and date
+---------+--------------+-------+------------+---------+------+------+
| id| txnId|account| date| idl| type|amount|
+---------+--------------+-------+------------+---------+------+------+
| 153| 0000004512 | 30095| 11272020| 30| debit| 1000|
| 153| 0000004512 | 30096| 11272020| 0|credit| 200|
| 145| 0000004513 | 30095| 11272020| 0| debit| 4000|
| 135| 0000004512 | 30096| 11272020| 0|credit| 2000|
| 153| 0000004512 | 30097| 11272020| 0| debit| 1000|
| 145| 0000004514 | 30094| 11272020| 0| debit| 1000|
+---------+--------------+-------+------------+---------+------+------+
so after grouping , the output is
+---------+--------------+-------+------------+---------+------+------+
| id| txnId|account| date| idl| type|amount|
+---------+--------------+-------+------------+---------+------+------+
| 153| 0000004512| 30095| 11272020| 30| debit| 1000|
| 153| 0000004512| 30096| 11272020| 0|credit| 200|
| 153| 0000004512| 30097| 11272020| 0| debit| 1000|
| 153| 0000004512| 30097| 11272020| 0|credit| 500|
| 145| 0000004513| 30095| 11272020| 0| debit| 4000|
| 145| 0000004514| 30094| 11272020| 0| debit| 1000|
| 135| 0000004512| 30096| 11272020| 0|credit| 2000|
+---------+--------------+-------+------------+---------+------+------+
I need to add a third and fourth column to the data frame such that it is a total of amounts by the type credit or debit for that group , the output should look like
+---------+--------------+-------+-----------+---------+------+------+-----------+----------+
| id| txnId|account| date| idl| type|amount|totalcredit|totaldebit|
+---------+--------------+-------+-----------+---------+------+------+-----------+----------+
| 153| 0000004512| 30095| 11272020| 30| debit| 1000| 0| 2000|
| 153| 0000004512| 30096| 11272020| 0|credit| 200| 700| 0|
| 153| 0000004512| 30097| 11272020| 0| debit| 1000| 0| 2000|
| 153| 0000004512| 30097| 11272020| 0|credit| 500| 700| 0|
| 145| 0000004513| 30095| 11272020| 0| debit| 4000| 0| 4000|
| 145| 0000004514| 30094| 11272020| 0|credit| 1000| 1000| 0|
| 135| 0000004512| 30096| 11272020| 0|credit| 2000| 2000| 0|
+---------+--------------+-------+-----------+---------+------+------+-----------+----------+
I have written the below code to add new column for
Dataset <Row> df3 = df2.where(df2.col("type").equalTo("credit"))
.groupBy("type")
.agg(sum("amount")).withColumnRenamed("sum(amount)", "totalcredit");
but it is dropping the other columns from the dataset, how do I preserve the other columns in the dataset ?.
Upvotes: 1
Views: 692
Reputation: 32650
You want to use conditional sum aggregation over a Window partitioned by id
:
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import static org.apache.spark.sql.functions.*;
WindowSpec w = Window.partitionBy("id");
Dataset <Row> df3 = df2.withColumn(
"totalcredit",
when(
col("type").equalTo("credit"),
sum(when(col("type").equalTo("credit"), col("amount"))).over(w)
).otherwise(0)
).withColumn(
"totaldebit",
when(
col("type").equalTo("debit"),
sum(when(col("type").equalTo("debit"), col("amount"))).over(w)
).otherwise(0)
);
df3.show();
//+---+-----+-------+--------+---+------+------+-----------+----------+
//| id|txnId|account| date|idl| type|amount|totalcredit|totaldebit|
//+---+-----+-------+--------+---+------+------+-----------+----------+
//|145| 4513| 30095|11272020| 0| debit| 4000| 0| 5000|
//|145| 4514| 30094|11272020| 0| debit| 1000| 0| 5000|
//|135| 4512| 30096|11272020| 0|credit| 2000| 2000| 0|
//|153| 4512| 30095|11272020| 30| debit| 1000| 0| 2000|
//|153| 4512| 30096|11272020| 0|credit| 200| 700| 0|
//|153| 4512| 30097|11272020| 0| debit| 1000| 0| 2000|
//|153| 4512| 30097|11272020| 0|credit| 500| 700| 0|
//+---+-----+-------+--------+---+------+------+-----------+----------+
Upvotes: 1