theQi
theQi

Reputation: 33

Using case class to add unknown columns as null

I am creating a new dataframe (set by case class) with an input dataframe that may have less/different number of columns than the existing one. I am trying to use case class to set the value of the non-existing as null.

I am using this case class to drive the new dataframe to be created.

The input dataframe (incomingDf) may not have the all variables field that are set as null above.

case class existingSchema(source_key: Int
                        , sequence_number: Int
                        , subscriber_id: String
                        , subscriber_ssn: String
                        , last_name: String
                        , first_name: String
                        , variable1: String = null
                        , variable2: String = null
                        , variable3: String = null
                        , variable4: String = null
                        , variable5: String = null
                        , source_date: Date
                        , load_date: Date
                        , file_name_String: String)

val incomingDf = spark.table("raw.incoming")

val formattedDf = incomingDf.as[existingSchema].toDF()

This throws an error at compile time.

The new schema of the formattedDf is expected to have the same schema as the case class existingSchema.

incomingDf.printSchema
root
 |-- source_key: integer (nullable = true)
 |-- sequence_number: integer (nullable = true)
 |-- subscriber_id: string (nullable = true)
 |-- subscriber_ssn: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- variable1: string (nullable=true)
 |-- variable3: string (nullable = true)
 |-- source_date: date (nullable = true)
 |-- load_date: date (nullable = true)
 |-- file_name_string: string (nullable = true)

Compile Error:

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
    val formattedDf = incomingDf.as[existingSchema].toDF()                                                                                                                                                                                                                                                                               
                                                     ^                                                                                                                                                                                                                                                                                                 
one error found                                                                                                                                                                                                                                                                                                                                        
 FAILED                                                                                                                                                                                                                                                                                                                                                

FAILURE: Build failed with an exception.                                                                                                                                                                                                                                                                                                               

* What went wrong:                                                                                                                                                                                                                                                                                                                                     
Execution failed for task ':compileScala'.                                                                                                                                                                                                                                                                                                             
> Compilation failed 

Update: I added the code line:

import incomingDf.sparkSession.implicits._

and the compile is fine.

I get the following error now in the run time:

19/04/17 14:37:56 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: cannot resolve '`variable2`' given input columns: [variable1, variable3, sequence_number, last_name, first_name, file_name_string, subscriber_id, load_date, source_key];
org.apache.spark.sql.AnalysisException: cannot resolve '`variable2`' given input columns: [variable1, variable3, sequence_number, last_name, first_name, file_name_string, subscriber_id, load_date, source_key];
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:88)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:85)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)

Upvotes: 2

Views: 2235

Answers (2)

shay__
shay__

Reputation: 3990

The existing schema is missing some String fields of the case class. You just need to add them explicitly:

val formattedDf = Seq("variable2", "variable4", "variable5")
  .foldLeft(incomingDf)((df, col) => {
    df.withColumn(col, lit(null.asInstanceOf[String]))
  }).as[existingSchema].toDF()

A more general solution would be to infer the missing fields.

Upvotes: 1

kingledion
kingledion

Reputation: 2500

You probably want to specifically define your DF schema. For example:

import org.apache.spark.sql.types._

val newSchema: StructType = StructType(Array(
  StructField("nested_array", ArrayType(ArrayType(StringType)), true),
  StructField("numbers", IntegerType, true),
  StructField("text", StringType, true)
))

// Given a DataFrame df...
val combinedSchema = StructType(df.schema ++ newSchema)
val resultRDD = ... // here, process df to add rows or whatever and get the result as an RDD
                    // you can get an RDD as simply as df.rdd
val outDf = sparkSession.createDataFrame(resultRDD, combinedSchema)

The third member of the [StructField][1] argument ensures that the newly created fields are nullable. The default is true, so you don't really have to add that, but I include it for clarity, since the whole purpose of using this method is to create a specifically nullable field.

Upvotes: 3

Related Questions