Reputation: 753
I have a dataframe like below in pyspark
df = sqlContext.createDataFrame(
[
(100,"Single",0.00,0.00,0.00,0.00,0.00,0.00,0.02),
(101,"Single",8.03,0.00,2.10,1.46,4.01,0.00,0.63),
(102,"Single",0.00,0.00,0.00,0.00,0.00,0.00,0.02),
(100,"STACK",10.00,0.00,0.00,0.00,0.00,0.00,0.02),
(101,"Single",0.00,0.00,13.36,125.90,417.93,0.00,20.91),
(102,"PLUS",188.67,0.00,0.00,0.00,0.00,0.00,0.00),
(100,"STACK",0.00,0.00,1.81,0.00,0.00,0.00,0.00),
(101,"STACK",0.00,3.34,-0.01,53.97,43.26,0.00,5.64),
(102,"STACK",417.93,0.00,0.00,0.00,0.00,0.00,0.00)
],
("MERCHANT_ID","RATE", "INCOME_PDR" , "MULTI_PDR" , "TRANS_FEE_INCOME" , "PDR_MARGIN" , "INTER_CHANGE" , "TOTAL_MULTI_PDR" , "VOLUME_INC_MULTI"))
df.show()
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
|MERCHANT_ID| RATE|INCOME_PDR|MULTI_PDR|TRANS_FEE_INCOME|PDR_MARGIN|INTER_CHANGE|TOTAL_MULTI_PDR|VOLUME_INC_MULTI|
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
| 100|Single| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.02|
| 101|Single| 8.03| 0.0| 2.1| 1.46| 4.01| 0.0| 0.63|
| 102|Single| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.02|
| 100| STACK| 10.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.02|
| 101|Single| 0.0| 0.0| 13.36| 125.9| 417.93| 0.0| 20.91|
| 102| PLUS| 188.67| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
| 100| STACK| 0.0| 0.0| 1.81| 0.0| 0.0| 0.0| 0.0|
| 101| STACK| 0.0| 3.34| -0.01| 53.97| 43.26| 0.0| 5.64|
| 102| STACK| 417.93| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0|
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
From this dataframe I would like to do a sum of a column
and add a new column
with value
as column_name of the which I am performing the sum
I have done like below
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit
df1 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('INCOME_PDR'))\
.select(['MERCHANT_ID','RATE','INCOME_PDR','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('INCOME_PDR')\
.withColumnRenamed("sum(INCOME_PDR)", "AMOUNT_AUD")
df1.show()
+-----------+------+---------------------+----------+
|MERCHANT_ID| RATE|ACQUIRED_REVENUE_CODE|AMOUNT_AUD|
+-----------+------+---------------------+----------+
| 100| STACK| INCOME_PDR| 10.0|
| 101|Single| INCOME_PDR| 8.03|
| 100|Single| INCOME_PDR| 0.0|
| 102| PLUS| INCOME_PDR| 188.67|
| 102| STACK| INCOME_PDR| 417.93|
| 101| STACK| INCOME_PDR| 0.0|
| 102|Single| INCOME_PDR| 0.0|
+-----------+------+---------------------+----------+
Same way for another column
df2 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('MULTI_PDR'))\
.select(['MERCHANT_ID','RATE','MULTI_PDR','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('MULTI_PDR')\
.withColumnRenamed("sum(MULTI_PDR)", "AMOUNT_AUD")
df2.show()
+-----------+------+---------------------+----------+
|MERCHANT_ID| RATE|ACQUIRED_REVENUE_CODE|amount_aud|
+-----------+------+---------------------+----------+
| 102| PLUS| MULTI_PDR| 0.0|
| 102| STACK| MULTI_PDR| 0.0|
| 101| STACK| MULTI_PDR| 3.34|
| 102|Single| MULTI_PDR| 0.0|
| 100| STACK| MULTI_PDR| 0.0|
| 101|Single| MULTI_PDR| 0.0|
| 100|Single| MULTI_PDR| 0.0|
+-----------+------+---------------------+----------+
I want to do this for around 7 columns. I have done like below
3rd column
df3 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('TRANS_FEE_INCOME'))\
.select(['MERCHANT_ID','RATE','TRANS_FEE_INCOME','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('TRANS_FEE_INCOME')\
.withColumnRenamed("sum(TRANS_FEE_INCOME)", "AMOUNT_AUD")
column 4
df4 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('PDR_MARGIN'))\
.select(['MERCHANT_ID','RATE','PDR_MARGIN','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('PDR_MARGIN')\
.withColumnRenamed("sum(PDR_MARGIN)", "AMOUNT_AUD")
column 5
df5 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('INTER_CHANGE'))\
.select(['MERCHANT_ID','RATE','INTER_CHANGE','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('INTER_CHANGE')\
.withColumnRenamed("sum(INTER_CHANGE)", "AMOUNT_AUD")
column 6
df6 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('TOTAL_MULTI_PDR'))\
.select(['MERCHANT_ID','RATE','TOTAL_MULTI_PDR','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('TOTAL_MULTI_PDR')\
.withColumnRenamed("sum(TOTAL_MULTI_PDR)", "AMOUNT_AUD")
column 7
df7 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('VOLUME_INC_MULTI'))\
.select(['MERCHANT_ID','RATE','VOLUME_INC_MULTI','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('VOLUME_INC_MULTI')\
.withColumnRenamed("sum(VOLUME_INC_MULTI)", "AMOUNT_AUD")
Then I want to a union of all the 7 dataframes created. i have done like below
dfs =[df1,df2,df3,df4,df5,df6,df7]
df8 = reduce(DataFrame.unionAll, dfs)
I am able to get what is required. I would like know if there is a better approach than what I have done
Upvotes: 0
Views: 1143
Reputation: 13581
You can transform your dataframe by using flatMap
.
flatMap
will iterate for each row with the lambda function. After that, map
converts the row element to the n-tuple
by iterating the column index i
such as
row[0]
= value of MERCHANT_ID
columnrow[1]
= value of RATE
columncols[i]
= i
-th element of cols
where i
from (2, len(cols))
row[i]
= value of i
-th column where i
from (2, len(cols))
|MERCHANT_ID| RATE|INCOME_PDR|MULTI_PDR|TRANS_FEE_INCOME|PDR_MARGIN|INTER_CHANGE|TOTAL_MULTI_PDR|VOLUME_INC_MULTI|
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
| 100|Single| 0.0| 0.0| 0.0| 0.0| 0.0| 0.0| 0.02|
For example, the 4-tuple
is made from the first row as follows:
i=2
: (100, 'Single', 'MULTI_PDR', 0.0)
<-- column for i=2
is MULTI_PDR
i=3
: (100, 'Single', 'TRANS_FEE_INCOME', 0.0)
<-- column for i=2
is TRANS_FEE_INCOME
i=len(cols)
: (100, 'Single', 'VOLUME_INC_MULTI', 0.02)
<-- column for i=len(cols)
is VOLUME_INC_MULTI
This is repeated for every row by flatMap
#schema = df.schema
#df2 = df.rdd.flatMap(lambda row: map(lambda i: (row[0], row[1], schema[i].name, row[i]), range(2, len(schema)) )) \
# .toDF(['MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE', 'AMOUNT_AUD'])
cols = df.columns
df2 = df.rdd.flatMap(lambda row: map(lambda i: (row[0], row[1], cols[i], row[i]), range(2, len(cols)) )) \
.toDF(['MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE', 'AMOUNT_AUD'])
import pyspark.sql.functions as f
df2.groupBy('MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE') \
.agg(f.sum('AMOUNT_AUD').alias('AMOUNT_AUD')) \
.show(30, False)
+-----------+------+---------------------+----------+
|MERCHANT_ID|RATE |ACQUIRED_REVENUE_CODE|AMOUNT_AUD|
+-----------+------+---------------------+----------+
|102 |STACK |TRANS_FEE_INCOME |0.0 |
|100 |STACK |PDR_MARGIN |0.0 |
|100 |STACK |TOTAL_MULTI_PDR |0.0 |
|102 |STACK |MULTI_PDR |0.0 |
|102 |STACK |PDR_MARGIN |0.0 |
|102 |STACK |INCOME_PDR |417.93 |
|100 |Single|INCOME_PDR |0.0 |
|100 |Single|PDR_MARGIN |0.0 |
|101 |STACK |INTER_CHANGE |43.26 |
|100 |Single|TOTAL_MULTI_PDR |0.0 |
|100 |STACK |INTER_CHANGE |0.0 |
|102 |STACK |VOLUME_INC_MULTI |0.0 |
|102 |Single|TOTAL_MULTI_PDR |0.0 |
|102 |PLUS |MULTI_PDR |0.0 |
|100 |Single|VOLUME_INC_MULTI |0.02 |
|101 |Single|TOTAL_MULTI_PDR |0.0 |
|100 |Single|INTER_CHANGE |0.0 |
|101 |STACK |INCOME_PDR |0.0 |
|101 |STACK |TOTAL_MULTI_PDR |0.0 |
|102 |Single|INTER_CHANGE |0.0 |
|102 |STACK |TOTAL_MULTI_PDR |0.0 |
|100 |Single|TRANS_FEE_INCOME |0.0 |
|102 |Single|INCOME_PDR |0.0 |
|101 |STACK |PDR_MARGIN |53.97 |
|101 |Single|INCOME_PDR |8.03 |
|100 |STACK |TRANS_FEE_INCOME |1.81 |
|102 |PLUS |INTER_CHANGE |0.0 |
|102 |PLUS |TRANS_FEE_INCOME |0.0 |
|101 |Single|INTER_CHANGE |421.94 |
|102 |Single|VOLUME_INC_MULTI |0.02 |
+-----------+------+---------------------+----------+
only showing top 30 rows
Here is another way to do only with the dataframe:
cols = df.columns
cols.remove('MERCHANT_ID')
cols.remove('RATE')
import pyspark.sql.functions as f
df.withColumn('array', f.explode(f.arrays_zip(f.array(*map(lambda x: f.lit(x), cols)), f.array(*cols), ))) \
.select('MERCHANT_ID', 'RATE', 'array.*') \
.toDF('MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE', 'AMOUNT_AUD') \
.groupBy('MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE') \
.agg(f.sum('AMOUNT_AUD').alias('AMOUNT_AUD')) \
.show(30, False)
where the method is referenced from here.
Upvotes: 2