DataQuest5
DataQuest5

Reputation: 59

Complex Pivot-Unpivot in spark scala

i am using Spark and scala 2.4

My salesperson Dataframe looks like: it has total 54 salesperson, i took example of only 4 column

Schema of SalesPerson table.

root
 |-- col: struct (nullable = false)
 |    |-- salesperson_4: string (nullable = true)
 |    |-- salesperson_10: string (nullable = true)
 |    |-- salesperson_11: string (nullable = true)
 |    |-- salesperson_21: string (nullable = true)

Data of Salesperson Table.

+--------------+--------------+--------------+--------------+
|salesperson_4 |salesperson_10|salesperson_11|salesperson_21|
+--------------+--------------+--------------+--------------+
| Customer_933 | Customer_1760| Customer_454 | Customer_127 |
|Customer_1297 |Customer_2411 |Customer_158  |Customer_2703 |
|Customer_861  |Customer_1550 |Customer_812 |Customer_2976 |

+--------------+--------------+--------------+--------------+

My salesType dataframe looks like

Schema of salesType

root
 |-- Type: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesType

|Type  |customer     |
+------+-------------+
|Online|Customer_933 |
|inshop|Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online|Customer_812 |
|Online|Customer_812 |
|inshop|Customer_127 |
+------+-------------+

i am trying to check which all customer from Salesperson table are available in SalesType table. with two additional column, which shows customer belong to specific salespersonand count of customer occurance in SalesPlace table. Basically all customer from salesperson table and it existance in SalesType table

Expected Output:
+------+-------------++------+-------------++------+-------------+
    CustomerBelongstoSalesperson|Customer     |occurance|
    salesperson_4               |Customer_933 |2
    salesperson_10              |Customer_2411|2 
    salesperson_4               |Customer_1297|1
    salesperson_10              |Customer_1550|1
    SalesPerson_21              |Customer_2976|1
    SalesPerson_11              |Customer_812 |2
    SalesPerson_21              |Customer_127 |1
    salesperson_4               |Customer_861 |0
    salesperson_10              |Customer_1760|0
    SalesPerson_11              |Customer_454 |0
    SalesPerson_11              |Customer_158 |0
     SalesPerson_21             |Customer_2703|0
+------+-------------++------+-------------++------+-------------+  

Code:

val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)
    
processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)
  

Thanks a lot...... Please share your suggestion enter image description here

enter image description here

Upvotes: 0

Views: 185

Answers (1)

sathya
sathya

Reputation: 2072

this is working in spark 2.4.0+,

val sourceDF = Seq(
    ("Customer_933","Customer_1760","Customer_454","Customer_127"),
    ("Customer_1297","Customer_2411","Customer_158","Customer_2703"),
    ("Customer_861","Customer_1550","Customer_812","Customer_2976")
).toDF("salesperson_4","salesperson_10","salesperson_11","salesperson_21")
sourceDF.show()

/*
+-------------+--------------+--------------+--------------+
|salesperson_4|salesperson_10|salesperson_11|salesperson_21|
+-------------+--------------+--------------+--------------+
| Customer_933| Customer_1760|  Customer_454|  Customer_127|
|Customer_1297| Customer_2411|  Customer_158| Customer_2703|
| Customer_861| Customer_1550|  Customer_812| Customer_2976|
+-------------+--------------+--------------+--------------+
*/

val salesDF=
Seq(
("Online","Customer_933"),
("inshop","Customer_933"),
("inshop","Customer_1297"),
("Online","Customer_2411"),
("Online","Customer_2411"),
("Online","Customer_1550"),
("Online","Customer_2976"),
("Online","Customer_812"),
("Online","Customer_812"),
("inshop","Customer_127")).toDF("Type","Customer")

salesDF.show()

/*
+------+-------------+
|  Type|     Customer|
+------+-------------+
|Online| Customer_933|
|inshop| Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online| Customer_812|
|Online| Customer_812|
|inshop| Customer_127|
+------+-------------+
*/

val stringCol = sourceDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = sourceDF.selectExpr(s"stack(${sourceDF.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)

/*
+--------------+-------------+
|Salesperson   |Customer     |
+--------------+-------------+
|salesperson_4 |Customer_933 |
|salesperson_10|Customer_1760|
|salesperson_11|Customer_454 |
|salesperson_21|Customer_127 |
|salesperson_4 |Customer_1297|
|salesperson_10|Customer_2411|
|salesperson_11|Customer_158 |
|salesperson_21|Customer_2703|
|salesperson_4 |Customer_861 |
|salesperson_10|Customer_1550|
|salesperson_11|Customer_812 |
|salesperson_21|Customer_2976|
+--------------+-------------+
*/


processedDF.join(salesDF, Seq("Customer"), "left").groupBy("Customer").agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson")).show(false)

/*
+-------------+---------+--------------+
|Customer     |Occurance|Salesperson   |
+-------------+---------+--------------+
|Customer_2411|2        |salesperson_10|
|Customer_158 |0        |salesperson_11|
|Customer_812 |2        |salesperson_11|
|Customer_1760|0        |salesperson_10|
|Customer_2703|0        |salesperson_21|
|Customer_861 |0        |salesperson_4 |
|Customer_127 |1        |salesperson_21|
|Customer_2976|1        |salesperson_21|
|Customer_1297|1        |salesperson_4 |
|Customer_454 |0        |salesperson_11|
|Customer_933 |2        |salesperson_4 |
|Customer_1550|1        |salesperson_10|
+-------------+---------+--------------+

*/

Upvotes: 1

Related Questions