Ben Hoffman
Ben Hoffman

Reputation: 127

Handling schema mismatches in Spark

I am reading a csv file using Spark in Scala. The schema is predefined and i am using it for reading. This is the esample code:

// create the schema
val schema= StructType(Array(
      StructField("col1", IntegerType,false),
      StructField("col2", StringType,false),
      StructField("col3", StringType,true)))

// Initialize Spark session
val spark: SparkSession = SparkSession.builder
    .appName("Parquet Converter")
    .getOrCreate

// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)

From what i read when reading cav with Spark using a schema there are 3 options:

  1. Set mode to DROPMALFORMED --> this will drop the lines that don't match the schema
  2. Set mode to PERMISSIVE --> this will set the whole line to null values
  3. Set mode to FAILFAST --> this will throw an exception when a mismatch is discovered

What is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame. Basically, I want a combination of FAILFAST and DROPMALFORMED.

Thanks in advance

Upvotes: 5

Views: 8868

Answers (2)

user10651087
user10651087

Reputation:

Just use DROPMALFORMED and follow the log. If malformed records are present, they are dumped to the log, up to the limit set by the maxMalformedLogPerPartition option.

spark.read.format("csv")
  .schema(schema)
  .option("header", false)
  .option("mode", "DROPMALFORMED")
  .option("maxMalformedLogPerPartition", 128)
  .load(inputCsvPath)

Upvotes: 0

Ben Hoffman
Ben Hoffman

Reputation: 127

This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:

val schema= StructType(Array(
    StructField("col1", IntegerType,true),    
    StructField("col2", StringType,false),
    StructField("col3", StringType,true),
    StructField("_corrupt_record", StringType, true)))

Then I read the CSV using PERMISSIVE mode (it is Spark default):

val dataFrame: DataFrame = spark.read.format("csv")
                                .schema(schema)
                                .option("header", false)
                                .option("mode", "PERMISSIVE")
                                .load(inputCsvPath)

Now my data frame holds an additional column that holds the rows with schema mismatches. I filtered the rows that have mismatched data and printed it:

val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()

Upvotes: 3

Related Questions