somnathchakrabarti
somnathchakrabarti

Reputation: 3128

Spark dataframe how to select columns using Seq[String]

Input JSON data format

Input schema 
root
 |-- class: string (nullable = true)
 |-- createdBy: string (nullable = true)
 |-- createdDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- lastModifiedBy: string (nullable = true)
 |-- lastModifiedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- planId: string (nullable = true)
 |-- planWeekDataFormatted: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- bbDemoImps: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- bbDemoImpsAttributes: struct (nullable = true)
 |    |    |    |    |    |-- demoId: string (nullable = true)
 |    |    |    |    |    |-- imps: long (nullable = true)
 |    |    |    |    |    |-- ue: long (nullable = true)
 |    |    |    |    |-- uuid: long (nullable = true)
 |    |    |-- demoValues: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- demoAttributes: struct (nullable = true)
 |    |    |    |    |    |-- cpm: long (nullable = true)
 |    |    |    |    |    |-- cpp: long (nullable = true)
 |    |    |    |    |    |-- demoId: string (nullable = true)
 |    |    |    |    |    |-- grps: long (nullable = true)
 |    |    |    |    |    |-- imps: long (nullable = true)
 |    |    |    |    |    |-- rcImps: long (nullable = true)
 |    |    |    |    |    |-- totalCpm: long (nullable = true)
 |    |    |    |    |    |-- totalGrps: long (nullable = true)
 |    |    |    |    |    |-- totalImps: long (nullable = true)
 |    |    |    |    |    |-- ue: long (nullable = true)
 |    |    |    |    |    |-- vpvh: long (nullable = true)
 |    |    |    |    |-- demoId: long (nullable = true)
 |    |    |-- hhDemo: struct (nullable = true)
 |    |    |    |-- demoId: string (nullable = true)
 |    |    |    |-- imps: long (nullable = true)
 |    |    |    |-- ue: long (nullable = true)
 |    |    |-- periodId: string (nullable = true)
 |    |    |-- rcPublishedDate: string (nullable = true)
 |    |    |-- unitRates: struct (nullable = true)
 |    |    |    |-- rate: long (nullable = true)
 |    |    |    |-- rcRate: long (nullable = true)
 |    |    |    |-- totalRate: long (nullable = true)
 |    |    |    |-- units: string (nullable = true)
 |    |    |-- uuid: long (nullable = true)
 |    |    |-- weekStartDate: long (nullable = true)
 |-- planWorkspaceProduct: struct (nullable = true)
 |    |-- channelId: string (nullable = true)
 |    |-- commercialTypeId: string (nullable = true)
 |    |-- lineClassAttributes: struct (nullable = true)
 |    |    |-- canExport: boolean (nullable = true)
 |    |    |-- canInvoice: boolean (nullable = true)
 |    |    |-- canProduce: boolean (nullable = true)
 |    |    |-- guaranteedAudience: long (nullable = true)
 |    |    |-- guaranteedRate: long (nullable = true)
 |    |    |-- hasPerformance: boolean (nullable = true)
 |    |    |-- planAudience: long (nullable = true)
 |    |    |-- planRate: long (nullable = true)
 |    |-- lineClassId: string (nullable = true)
 |    |-- lineId: string (nullable = true)
 |    |-- lineNo: struct (nullable = true)
 |    |    |-- $numberLong: string (nullable = true)
 |    |-- planProductId: string (nullable = true)
 |    |-- productId: string (nullable = true)
 |    |-- spotLengthId: string (nullable = true)
 |-- rates: struct (nullable = true)
 |    |-- period: struct (nullable = true)
 |    |    |-- endDate: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- startDate: long (nullable = true)
 |-- version: struct (nullable = true)
 |    |-- $numberLong: string (nullable = true)
 |-- offsets: integer (nullable = true)
 |-- modifiedTime: long (nullable = true)
 |-- opCode: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- tenant: string (nullable = true)
 |-- etl_timestamp: long (nullable = false)
 |-- topic: string (nullable = true)

Expected output schema

root
 |-- class: string (nullable = true)
 |-- createdBy: string (nullable = true)
 |-- lastModifiedBy: string (nullable = true)
 |-- planId: string (nullable = true)
 |-- offsets: integer (nullable = true)
 |-- modifiedTime: long (nullable = true)
 |-- opCode: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- tenant: string (nullable = true)
 |-- etl_timestamp: long (nullable = false)
 |-- topic: string (nullable = true)
 |-- createdDate_$date: long (nullable = true)
 |-- id_$oid: string (nullable = true)
 |-- lastModifiedDate_$date: long (nullable = true)
 |-- planWorkspaceProduct_channelId: string (nullable = true)
 |-- planWorkspaceProduct_commercialTypeId: string (nullable = true)
 |-- planWorkspaceProduct_lineClassId: string (nullable = true)
 |-- planWorkspaceProduct_lineId: string (nullable = true)
 |-- planWorkspaceProduct_planProductId: string (nullable = true)
 |-- planWorkspaceProduct_productId: string (nullable = true)
 |-- planWorkspaceProduct_spotLengthId: string (nullable = true)
 |-- version_$numberLong: string (nullable = true)
 |-- planWeekDataFormatted_periodId: string (nullable = true)
 |-- planWeekDataFormatted_rcPublishedDate: string (nullable = true)
 |-- planWeekDataFormatted_weekStartDate: long (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_canExport: boolean (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_canInvoice: boolean (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_canProduce: boolean (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_guaranteedAudience: long (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_guaranteedRate: long (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_hasPerformance: boolean (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_planAudience: long (nullable = true)
 |-- planWorkspaceProduct_lineClassAttributes_planRate: long (nullable = true)
 |-- planWorkspaceProduct_lineNo_$numberLong: string (nullable = true)
 |-- rates_period_endDate: long (nullable = true)
 |-- rates_period_name: string (nullable = true)
 |-- rates_period_startDate: long (nullable = true)
 **|-- planWeekDataFormatted_hhDemo_demoId: string (nullable = true)**
 |-- planWeekDataFormatted_unitRates_rate: long (nullable = true)
 |-- planWeekDataFormatted_unitRates_rcRate: long (nullable = true)
 |-- planWeekDataFormatted_unitRates_totalRate: long (nullable = true)
 |-- planWeekDataFormatted_unitRates_units: string (nullable = true)
 **|-- planWeekDataFormatted_bbDemoImps_bbDemoImpsAttributes_demoId: string (nullable = true)**
 **|-- planWeekDataFormatted_demoValues_demoAttributes_demoId: string (nullable = true)**

Trying the below code to explode the ArrayType column 'planWeekDataFormatted', then the nested ArrayType columns 'bbDemoImps', 'demoValues' and trying to extract only the demoIds from each object in the arrays.

//get all columns from resultDF, except "planWeekDataFormatted" column
    val dfwithoutPlanWeekData = resultDF.drop("planWeekDataFormatted")
    val colsWithoutPlanWeekData = dfwithoutPlanWeekData.columns.toSeq

val planweek_exploded = resultDF.withColumn("planWeekItem", explode($"planWeekDataFormatted"))
      .withColumn("bbDemoImpsAttribute", explode($"planWeekItem.bbDemoImps"))
      .withColumn("demoValuesAttribute", explode($"planWeekItem.demoValues"))
      .withColumn("hhDemoAttribute", $"planWeekItem.hhDemo")
      .select(
        colsWithoutPlanWeekData.map(c => col(c)): _*,
        col("bbDemoImpsAttribute.bbDemoImpsAttributes.demoId").as("bbDemoId"),
        col("demoValuesAttribute.demoAttributes.demoId").as("demoId"),
        col("hhDemoAttribute.demoId").as("hhDemoId")
      ).drop("planWeekItem", "bbDemoImpsAttribute", "demoValuesAttribute", "hhDemoAttribute")

Not allowing Spark dataframe to select mapped columns from Seq[String]

Getting the below error

> overloaded method value select with alternatives:   [U1, U2, U3,
> U4](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1],
> c2: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U2], c3:
> org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U3], c4:
> org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U4])org.apache.spark.sql.Dataset[(U1,
> U2, U3, U4)] <and>   (col: String,cols:
> String*)org.apache.spark.sql.DataFrame <and>   (cols:
> org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame  cannot be
> applied to (String, org.apache.spark.sql.Column,
> org.apache.spark.sql.Column, org.apache.spark.sql.Column)
>       .select(

Upvotes: 0

Views: 1641

Answers (1)

blackbishop
blackbishop

Reputation: 32700

Use :

.select(
        (colsWithoutPlanWeekData.map(c => col(c)) ++ Seq(
        col("bbDemoImpsAttribute.bbDemoImpsAttributes.demoId").as("bbDemoId"),
        col("demoValuesAttribute.demoAttributes.demoId").as("demoId"),
        col("hhDemoAttribute.demoId").as("hhDemoId"))): _*
)

Concat the 2 Seq before using the syntactic-sugar : _*

Upvotes: 1

Related Questions