Reputation: 127
I am reading a csv file using Spark in Scala. The schema is predefined and i am using it for reading. This is the esample code:
// create the schema
val schema= StructType(Array(
StructField("col1", IntegerType,false),
StructField("col2", StringType,false),
StructField("col3", StringType,true)))
// Initialize Spark session
val spark: SparkSession = SparkSession.builder
.appName("Parquet Converter")
.getOrCreate
// Create a data frame from a csv file
val dataFrame: DataFrame =
spark.read.format("csv").schema(schema).option("header", false).load(inputCsvPath)
From what i read when reading cav with Spark using a schema there are 3 options:
DROPMALFORMED
--> this will drop the lines that don't match the schemaPERMISSIVE
--> this will set the whole line to null valuesFAILFAST
--> this will throw an exception when a mismatch is discoveredWhat is the best way to combine the options? The behaviour I want is to get the mismatches in the schema, print them as errors and ignoring the lines in my data frame. Basically, I want a combination of FAILFAST and DROPMALFORMED.
Thanks in advance
Upvotes: 5
Views: 8868
Reputation:
Just use DROPMALFORMED
and follow the log. If malformed records are present, they are dumped to the log, up to the limit set by the maxMalformedLogPerPartition
option.
spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "DROPMALFORMED")
.option("maxMalformedLogPerPartition", 128)
.load(inputCsvPath)
Upvotes: 0
Reputation: 127
This is what I eventually did:
I added to the schema the "_corrupt_record" column, for example:
val schema= StructType(Array(
StructField("col1", IntegerType,true),
StructField("col2", StringType,false),
StructField("col3", StringType,true),
StructField("_corrupt_record", StringType, true)))
Then I read the CSV using PERMISSIVE mode (it is Spark default):
val dataFrame: DataFrame = spark.read.format("csv")
.schema(schema)
.option("header", false)
.option("mode", "PERMISSIVE")
.load(inputCsvPath)
Now my data frame holds an additional column that holds the rows with schema mismatches. I filtered the rows that have mismatched data and printed it:
val badRows = dataFrame.filter("_corrupt_record is not null")
badRows.cache()
badRows.show()
Upvotes: 3