Reputation: 59
i am using Spark and scala 2.4
My salesperson Dataframe looks like: it has total 54 salesperson, i took example of only 4 column
Schema of SalesPerson table.
root
|-- col: struct (nullable = false)
| |-- salesperson_4: string (nullable = true)
| |-- salesperson_10: string (nullable = true)
| |-- salesperson_11: string (nullable = true)
| |-- salesperson_21: string (nullable = true)
Data of Salesperson Table.
+--------------+--------------+--------------+--------------+
|salesperson_4 |salesperson_10|salesperson_11|salesperson_21|
+--------------+--------------+--------------+--------------+
| Customer_933 | Customer_1760| Customer_454 | Customer_127 |
|Customer_1297 |Customer_2411 |Customer_158 |Customer_2703 |
|Customer_861 |Customer_1550 |Customer_812 |Customer_2976 |
+--------------+--------------+--------------+--------------+
My salesType dataframe looks like
Schema of salesType
root
|-- Type: string (nullable = true)
|-- Customer: string (nullable = true)
Data of salesType
|Type |customer |
+------+-------------+
|Online|Customer_933 |
|inshop|Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online|Customer_812 |
|Online|Customer_812 |
|inshop|Customer_127 |
+------+-------------+
i am trying to check which all customer from Salesperson table are available in SalesType table.
with two additional column, which shows customer belong to specific salesperson
and count of customer occurance in SalesPlace table.
Basically all customer from salesperson table and it existance in SalesType table
Expected Output:
+------+-------------++------+-------------++------+-------------+
CustomerBelongstoSalesperson|Customer |occurance|
salesperson_4 |Customer_933 |2
salesperson_10 |Customer_2411|2
salesperson_4 |Customer_1297|1
salesperson_10 |Customer_1550|1
SalesPerson_21 |Customer_2976|1
SalesPerson_11 |Customer_812 |2
SalesPerson_21 |Customer_127 |1
salesperson_4 |Customer_861 |0
salesperson_10 |Customer_1760|0
SalesPerson_11 |Customer_454 |0
SalesPerson_11 |Customer_158 |0
SalesPerson_21 |Customer_2703|0
+------+-------------++------+-------------++------+-------------+
Code:
val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)
processedDF.join(df2, Seq("Customer"), "left")
.groupBy("Customer")
.agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
.show(false)
Thanks a lot...... Please share your suggestion
Upvotes: 0
Views: 185
Reputation: 2072
this is working in spark 2.4.0+,
val sourceDF = Seq(
("Customer_933","Customer_1760","Customer_454","Customer_127"),
("Customer_1297","Customer_2411","Customer_158","Customer_2703"),
("Customer_861","Customer_1550","Customer_812","Customer_2976")
).toDF("salesperson_4","salesperson_10","salesperson_11","salesperson_21")
sourceDF.show()
/*
+-------------+--------------+--------------+--------------+
|salesperson_4|salesperson_10|salesperson_11|salesperson_21|
+-------------+--------------+--------------+--------------+
| Customer_933| Customer_1760| Customer_454| Customer_127|
|Customer_1297| Customer_2411| Customer_158| Customer_2703|
| Customer_861| Customer_1550| Customer_812| Customer_2976|
+-------------+--------------+--------------+--------------+
*/
val salesDF=
Seq(
("Online","Customer_933"),
("inshop","Customer_933"),
("inshop","Customer_1297"),
("Online","Customer_2411"),
("Online","Customer_2411"),
("Online","Customer_1550"),
("Online","Customer_2976"),
("Online","Customer_812"),
("Online","Customer_812"),
("inshop","Customer_127")).toDF("Type","Customer")
salesDF.show()
/*
+------+-------------+
| Type| Customer|
+------+-------------+
|Online| Customer_933|
|inshop| Customer_933|
|inshop|Customer_1297|
|Online|Customer_2411|
|Online|Customer_2411|
|Online|Customer_1550|
|Online|Customer_2976|
|Online| Customer_812|
|Online| Customer_812|
|inshop| Customer_127|
+------+-------------+
*/
val stringCol = sourceDF.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
val processedDF = sourceDF.selectExpr(s"stack(${sourceDF.columns.length}, $stringCol) as (Salesperson, Customer)")
processedDF.show(false)
/*
+--------------+-------------+
|Salesperson |Customer |
+--------------+-------------+
|salesperson_4 |Customer_933 |
|salesperson_10|Customer_1760|
|salesperson_11|Customer_454 |
|salesperson_21|Customer_127 |
|salesperson_4 |Customer_1297|
|salesperson_10|Customer_2411|
|salesperson_11|Customer_158 |
|salesperson_21|Customer_2703|
|salesperson_4 |Customer_861 |
|salesperson_10|Customer_1550|
|salesperson_11|Customer_812 |
|salesperson_21|Customer_2976|
+--------------+-------------+
*/
processedDF.join(salesDF, Seq("Customer"), "left").groupBy("Customer").agg(count("Type").as("Occurance"), first("Salesperson").as("Salesperson")).show(false)
/*
+-------------+---------+--------------+
|Customer |Occurance|Salesperson |
+-------------+---------+--------------+
|Customer_2411|2 |salesperson_10|
|Customer_158 |0 |salesperson_11|
|Customer_812 |2 |salesperson_11|
|Customer_1760|0 |salesperson_10|
|Customer_2703|0 |salesperson_21|
|Customer_861 |0 |salesperson_4 |
|Customer_127 |1 |salesperson_21|
|Customer_2976|1 |salesperson_21|
|Customer_1297|1 |salesperson_4 |
|Customer_454 |0 |salesperson_11|
|Customer_933 |2 |salesperson_4 |
|Customer_1550|1 |salesperson_10|
+-------------+---------+--------------+
*/
Upvotes: 1