nmr
nmr

Reputation: 753

Dynamically union data frames in pyspark

I have a dataframe like below in pyspark

df = sqlContext.createDataFrame(
[
(100,"Single",0.00,0.00,0.00,0.00,0.00,0.00,0.02),
(101,"Single",8.03,0.00,2.10,1.46,4.01,0.00,0.63),
(102,"Single",0.00,0.00,0.00,0.00,0.00,0.00,0.02),
(100,"STACK",10.00,0.00,0.00,0.00,0.00,0.00,0.02),
(101,"Single",0.00,0.00,13.36,125.90,417.93,0.00,20.91),
(102,"PLUS",188.67,0.00,0.00,0.00,0.00,0.00,0.00),
(100,"STACK",0.00,0.00,1.81,0.00,0.00,0.00,0.00),
(101,"STACK",0.00,3.34,-0.01,53.97,43.26,0.00,5.64),
(102,"STACK",417.93,0.00,0.00,0.00,0.00,0.00,0.00)
], 
("MERCHANT_ID","RATE", "INCOME_PDR" , "MULTI_PDR" , "TRANS_FEE_INCOME" , "PDR_MARGIN" , "INTER_CHANGE" , "TOTAL_MULTI_PDR" , "VOLUME_INC_MULTI"))

df.show()

+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
|MERCHANT_ID|  RATE|INCOME_PDR|MULTI_PDR|TRANS_FEE_INCOME|PDR_MARGIN|INTER_CHANGE|TOTAL_MULTI_PDR|VOLUME_INC_MULTI|
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
|        100|Single|       0.0|      0.0|             0.0|       0.0|         0.0|            0.0|            0.02|
|        101|Single|      8.03|      0.0|             2.1|      1.46|        4.01|            0.0|            0.63|
|        102|Single|       0.0|      0.0|             0.0|       0.0|         0.0|            0.0|            0.02|
|        100| STACK|      10.0|      0.0|             0.0|       0.0|         0.0|            0.0|            0.02|
|        101|Single|       0.0|      0.0|           13.36|     125.9|      417.93|            0.0|           20.91|
|        102|  PLUS|    188.67|      0.0|             0.0|       0.0|         0.0|            0.0|             0.0|
|        100| STACK|       0.0|      0.0|            1.81|       0.0|         0.0|            0.0|             0.0|
|        101| STACK|       0.0|     3.34|           -0.01|     53.97|       43.26|            0.0|            5.64|
|        102| STACK|    417.93|      0.0|             0.0|       0.0|         0.0|            0.0|             0.0|
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+

From this dataframe I would like to do a sum of a column and add a new column with value as column_name of the which I am performing the sum

I have done like below

from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import lit


df1 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('INCOME_PDR'))\
.select(['MERCHANT_ID','RATE','INCOME_PDR','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('INCOME_PDR')\
.withColumnRenamed("sum(INCOME_PDR)", "AMOUNT_AUD")

df1.show()

+-----------+------+---------------------+----------+
|MERCHANT_ID|  RATE|ACQUIRED_REVENUE_CODE|AMOUNT_AUD|
+-----------+------+---------------------+----------+
|        100| STACK|           INCOME_PDR|      10.0|
|        101|Single|           INCOME_PDR|      8.03|
|        100|Single|           INCOME_PDR|       0.0|
|        102|  PLUS|           INCOME_PDR|    188.67|
|        102| STACK|           INCOME_PDR|    417.93|
|        101| STACK|           INCOME_PDR|       0.0|
|        102|Single|           INCOME_PDR|       0.0|
+-----------+------+---------------------+----------+

Same way for another column

df2 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('MULTI_PDR'))\
.select(['MERCHANT_ID','RATE','MULTI_PDR','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('MULTI_PDR')\
.withColumnRenamed("sum(MULTI_PDR)", "AMOUNT_AUD")

df2.show()

+-----------+------+---------------------+----------+
|MERCHANT_ID|  RATE|ACQUIRED_REVENUE_CODE|amount_aud|
+-----------+------+---------------------+----------+
|        102|  PLUS|            MULTI_PDR|       0.0|
|        102| STACK|            MULTI_PDR|       0.0|
|        101| STACK|            MULTI_PDR|      3.34|
|        102|Single|            MULTI_PDR|       0.0|
|        100| STACK|            MULTI_PDR|       0.0|
|        101|Single|            MULTI_PDR|       0.0|
|        100|Single|            MULTI_PDR|       0.0|
+-----------+------+---------------------+----------+

I want to do this for around 7 columns. I have done like below

3rd column

df3 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('TRANS_FEE_INCOME'))\
.select(['MERCHANT_ID','RATE','TRANS_FEE_INCOME','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('TRANS_FEE_INCOME')\
.withColumnRenamed("sum(TRANS_FEE_INCOME)", "AMOUNT_AUD")

column 4

df4 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('PDR_MARGIN'))\
.select(['MERCHANT_ID','RATE','PDR_MARGIN','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('PDR_MARGIN')\
.withColumnRenamed("sum(PDR_MARGIN)", "AMOUNT_AUD")

column 5

df5 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('INTER_CHANGE'))\
.select(['MERCHANT_ID','RATE','INTER_CHANGE','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('INTER_CHANGE')\
.withColumnRenamed("sum(INTER_CHANGE)", "AMOUNT_AUD")

column 6

df6 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('TOTAL_MULTI_PDR'))\
.select(['MERCHANT_ID','RATE','TOTAL_MULTI_PDR','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('TOTAL_MULTI_PDR')\
.withColumnRenamed("sum(TOTAL_MULTI_PDR)", "AMOUNT_AUD")

column 7

df7 = df.withColumn('ACQUIRED_REVENUE_CODE', lit('VOLUME_INC_MULTI'))\
.select(['MERCHANT_ID','RATE','VOLUME_INC_MULTI','ACQUIRED_REVENUE_CODE'])\
.groupby('MERCHANT_ID','RATE', 'ACQUIRED_REVENUE_CODE')\
.sum('VOLUME_INC_MULTI')\
.withColumnRenamed("sum(VOLUME_INC_MULTI)", "AMOUNT_AUD")

Then I want to a union of all the 7 dataframes created. i have done like below

dfs =[df1,df2,df3,df4,df5,df6,df7]

df8 = reduce(DataFrame.unionAll, dfs)

I am able to get what is required. I would like know if there is a better approach than what I have done

Upvotes: 0

Views: 1143

Answers (1)

Lamanus
Lamanus

Reputation: 13581

You can transform your dataframe by using flatMap.

flatMap will iterate for each row with the lambda function. After that, map converts the row element to the n-tuple by iterating the column index i such as

  • 1st: row[0] = value of MERCHANT_ID column
  • 2nd: row[1] = value of RATE column
  • 3rd: cols[i] = i-th element of cols where i from (2, len(cols))
  • 4th: row[i] = value of i-th column where i from (2, len(cols))
|MERCHANT_ID|  RATE|INCOME_PDR|MULTI_PDR|TRANS_FEE_INCOME|PDR_MARGIN|INTER_CHANGE|TOTAL_MULTI_PDR|VOLUME_INC_MULTI|
+-----------+------+----------+---------+----------------+----------+------------+---------------+----------------+
|        100|Single|       0.0|      0.0|             0.0|       0.0|         0.0|            0.0|            0.02|

For example, the 4-tuple is made from the first row as follows:

  • i=2: (100, 'Single', 'MULTI_PDR', 0.0) <-- column for i=2 is MULTI_PDR
  • i=3: (100, 'Single', 'TRANS_FEE_INCOME', 0.0) <-- column for i=2 is TRANS_FEE_INCOME
  • ...
  • i=len(cols): (100, 'Single', 'VOLUME_INC_MULTI', 0.02) <-- column for i=len(cols) is VOLUME_INC_MULTI

This is repeated for every row by flatMap

#schema = df.schema

#df2 = df.rdd.flatMap(lambda row: map(lambda i: (row[0], row[1], schema[i].name, row[i]), range(2, len(schema)) )) \
#        .toDF(['MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE', 'AMOUNT_AUD'])

cols = df.columns

df2 = df.rdd.flatMap(lambda row: map(lambda i: (row[0], row[1], cols[i], row[i]), range(2, len(cols)) )) \
        .toDF(['MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE', 'AMOUNT_AUD'])

import pyspark.sql.functions as f

df2.groupBy('MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE') \
  .agg(f.sum('AMOUNT_AUD').alias('AMOUNT_AUD')) \
  .show(30, False)

+-----------+------+---------------------+----------+
|MERCHANT_ID|RATE  |ACQUIRED_REVENUE_CODE|AMOUNT_AUD|
+-----------+------+---------------------+----------+
|102        |STACK |TRANS_FEE_INCOME     |0.0       |
|100        |STACK |PDR_MARGIN           |0.0       |
|100        |STACK |TOTAL_MULTI_PDR      |0.0       |
|102        |STACK |MULTI_PDR            |0.0       |
|102        |STACK |PDR_MARGIN           |0.0       |
|102        |STACK |INCOME_PDR           |417.93    |
|100        |Single|INCOME_PDR           |0.0       |
|100        |Single|PDR_MARGIN           |0.0       |
|101        |STACK |INTER_CHANGE         |43.26     |
|100        |Single|TOTAL_MULTI_PDR      |0.0       |
|100        |STACK |INTER_CHANGE         |0.0       |
|102        |STACK |VOLUME_INC_MULTI     |0.0       |
|102        |Single|TOTAL_MULTI_PDR      |0.0       |
|102        |PLUS  |MULTI_PDR            |0.0       |
|100        |Single|VOLUME_INC_MULTI     |0.02      |
|101        |Single|TOTAL_MULTI_PDR      |0.0       |
|100        |Single|INTER_CHANGE         |0.0       |
|101        |STACK |INCOME_PDR           |0.0       |
|101        |STACK |TOTAL_MULTI_PDR      |0.0       |
|102        |Single|INTER_CHANGE         |0.0       |
|102        |STACK |TOTAL_MULTI_PDR      |0.0       |
|100        |Single|TRANS_FEE_INCOME     |0.0       |
|102        |Single|INCOME_PDR           |0.0       |
|101        |STACK |PDR_MARGIN           |53.97     |
|101        |Single|INCOME_PDR           |8.03      |
|100        |STACK |TRANS_FEE_INCOME     |1.81      |
|102        |PLUS  |INTER_CHANGE         |0.0       |
|102        |PLUS  |TRANS_FEE_INCOME     |0.0       |
|101        |Single|INTER_CHANGE         |421.94    |
|102        |Single|VOLUME_INC_MULTI     |0.02      |
+-----------+------+---------------------+----------+
only showing top 30 rows

Here is another way to do only with the dataframe:

cols = df.columns

cols.remove('MERCHANT_ID')
cols.remove('RATE')

import pyspark.sql.functions as f

df.withColumn('array', f.explode(f.arrays_zip(f.array(*map(lambda x: f.lit(x), cols)), f.array(*cols), ))) \
  .select('MERCHANT_ID', 'RATE', 'array.*') \
  .toDF('MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE', 'AMOUNT_AUD') \
  .groupBy('MERCHANT_ID', 'RATE', 'ACQUIRED_REVENUE_CODE') \
  .agg(f.sum('AMOUNT_AUD').alias('AMOUNT_AUD')) \
  .show(30, False)

where the method is referenced from here.

Upvotes: 2

Related Questions