DataQuest5
DataQuest5

Reputation: 59

Column Name inside column of dataframe in spark with scala

enter image description hereI am using spark with Scala. 2.4.3

My salesperson Dataframe looks like: it has total 54 salesperson, i took example of only 3 column

Schema of SalesPerson table.
root
 |-- col: struct (nullable = false)
 |    |-- SalesPerson_1: string (nullable = true)
 |    |-- SalesPerson_2: string (nullable = true)
 |    |-- SalesPerson_3: string (nullable = true)

Data of Salesperson view.

     SalesPerson_1|SalesPerson_2|SalesPerson_3
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++
    [Customer_1793,  Customer_202,  Customer_2461]
    [Customer_2424, Customer_130, Customer_787]
    [Customer_1061, Customer_318, Customer_706]
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++

My salesplace dataframe looks like

Schema of salesplace
 
 root
 |-- Place: string (nullable = true)
 |-- Customer: string (nullable = true)

Data of salesplace
Place|Customer
Online| Customer_1793
Retail| Customer_1793
Retail| Customer_130
Online| Customer_130
Online| Customer_2461
Retail| Customer_2461
Online| Customer_2461

i am trying to check which customer from Salesperson table are available in SalesPlace table. with two additional column shows customer belong to salesperson

and count of customer occurance in SalesPlace table, for

Expected Output:

CustomerBelongstoSalesperson|Customer     |occurance|
SalesPerson_1               |Customer_1793|2
SalesPerson_2               |Customer_130 |2 
SalesPerson_3               |Customer_2461|3
SalesPerson_2               |Customer_202 |0
SalesPerson_1               |Customer_2424|0
SalesPerson_1               |Customer_1061|0
SalesPerson_2               |Customer_318 |0
SalesPerson_3               |Customer_787 |0

Code:

Error:
The number of aliases supplied in the AS clause does not match the number of columns output by the UDTF expected 54 aliases but got Salesperson,Customer ;

It seems little critical in spark. i am not sure if it is possible to bring columnname inside column as value.... May someone please help me with some idea how to do this........ Thanks

Upvotes: 0

Views: 223

Answers (1)

Som
Som

Reputation: 6338

Try this-

Load the test data provided

 val data1 =
      """
        |salesperson1          |  salesperson2
        |Customer_17         |Customer_202
        |Customer_24         |Customer_130
      """.stripMargin
    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +------------+------------+
      * |salesperson1|salesperson2|
      * +------------+------------+
      * |Customer_17 |Customer_202|
      * |Customer_24 |Customer_130|
      * +------------+------------+
      *
      * root
      * |-- salesperson1: string (nullable = true)
      * |-- salesperson2: string (nullable = true)
      */

    val data2 =
      """
        |Place  |Customer
        |shop  |Customer_17
        |Home  |Customer_17
        |shop  |Customer_17
        |Home  |Customer_130
        |Shop  |Customer_202
      """.stripMargin
    val stringDS2 = data2.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df2 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS2)
    df2.show(false)
    df2.printSchema()
    /**
      * +-----+------------+
      * |Place|Customer    |
      * +-----+------------+
      * |shop |Customer_17 |
      * |Home |Customer_17 |
      * |shop |Customer_17 |
      * |Home |Customer_130|
      * |Shop |Customer_202|
      * +-----+------------+
      *
      * root
      * |-- Place: string (nullable = true)
      * |-- Customer: string (nullable = true)
      */

Unpivot and left join

  val stringCol = df1.columns.map(c => s"'$c', cast(`$c` as string)").mkString(", ")
    val processedDF = df1.selectExpr(s"stack(${df1.columns.length}, $stringCol) as (Salesperson, Customer)")
    processedDF.show(false)
    /**
      * +------------+------------+
      * |Salesperson |Customer    |
      * +------------+------------+
      * |salesperson1|Customer_17 |
      * |salesperson2|Customer_202|
      * |salesperson1|Customer_24 |
      * |salesperson2|Customer_130|
      * +------------+------------+
      */

    processedDF.join(df2, Seq("Customer"), "left")
      .groupBy("Customer")
      .agg(count("Place").as("Occurance"), first("Salesperson").as("Salesperson"))
      .show(false)

    /**
      * +------------+---------+------------+
      * |Customer    |Occurance|Salesperson |
      * +------------+---------+------------+
      * |Customer_130|1        |salesperson2|
      * |Customer_17 |3        |salesperson1|
      * |Customer_202|1        |salesperson2|
      * |Customer_24 |0        |salesperson1|
      * +------------+---------+------------+
      */

Upvotes: 1

Related Questions