MitakaJ9
MitakaJ9

Reputation: 311

Spark: reading files with PERMISSIVE and provided schema - issues with corrupted records column

I am reading spark CSV. I am providing a schema for the file that I read and I read it permissive mode. I would like to keep all records in columnNameOfCorruptRecord (in my case corrupted_records).

I went trough hell to set this up and still get warnings that I cannot suppress i there something I miss.

So first in order to have the corrupted_records column I needed to add it to the schema as StringType. This is documented so okay. But whenever I read a file a get a Warning that the schema doesn't match because the amount of columns is different. It's just a warning, but it's filling my logs.

Also when there is a field that is not nullable and there is a corrupted record, the corrupted record goes to the corrupt_records column and all it's fields are set to null thus I get an exception because I have non nullable field. The only to solve this is to set that the columns are not nullable to nullable. Which is quite strange.

Am I missing something?

Recap:

  1. Is there a way to ignore the warning when I've added corrupted_records column in the schema
  2. Is there a way to use PERMISSIVE mode and corrupted_records column with schema that has non nullable fields.

Thanks!

Upvotes: 2

Views: 15864

Answers (3)

Andrea Baldino
Andrea Baldino

Reputation: 461

To answer your point 2, you should delve better point 1.

Point 1: you should do an analysis of your file and map your schema with all the fields in your file. After having imported your csv file into a DataFrame, I would select your fields of interest, and continue what you were doing.

Point 2: you will solve your problem defining your schema as follows (I would use scala):

    import pyspark.sql.types as types
    yourSchema = (types.StructType()
                  .add('field0', types.IntegerType(), True)
                     # all your .add(fieldsName, fieldsType, True which let your field be nullable)
                  .add('corrupted_records', types.StringType(), True) #your corrupted date will be here
                  )

With this having been defined, you can import your csv file into a DataFrame as follows:

    df = ( spark.read.format("csv")
             .schema(yourSchema)
             .option("mode", "PERMISSIVE")
             .option("columnNameOfCorruptRecord", "corrupted_records")
             load(your_csv_files)
         )

There are also other ways to do the same operation, and different modalities to handle bad bad; have a look at this insightful article: https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1

Upvotes: 0

geosmart
geosmart

Reputation: 666

a demo of read json code snippet

df= self.spark.read.option("mode", "PERMISSIVE").option("primitivesAsString", True).json(self.src_path)

Upvotes: 1

Shubhanshu
Shubhanshu

Reputation: 184

The following documentation might help. It would be great if you atleast provide the code you've written. https://docs.databricks.com/spark/latest/data-sources/read-csv.html

Upvotes: 1

Related Questions