Reputation: 311
I am reading spark CSV. I am providing a schema for the file that I read and I read it permissive mode. I would like to keep all records in columnNameOfCorruptRecord (in my case corrupted_records).
I went trough hell to set this up and still get warnings that I cannot suppress i there something I miss.
So first in order to have the corrupted_records column I needed to add it to the schema as StringType. This is documented so okay. But whenever I read a file a get a Warning that the schema doesn't match because the amount of columns is different. It's just a warning, but it's filling my logs.
Also when there is a field that is not nullable and there is a corrupted record, the corrupted record goes to the corrupt_records column and all it's fields are set to null thus I get an exception because I have non nullable field. The only to solve this is to set that the columns are not nullable to nullable. Which is quite strange.
Am I missing something?
Recap:
Thanks!
Upvotes: 2
Views: 15864
Reputation: 461
To answer your point 2, you should delve better point 1.
Point 1: you should do an analysis of your file and map your schema with all the fields in your file. After having imported your csv file into a DataFrame, I would select your fields of interest, and continue what you were doing.
Point 2: you will solve your problem defining your schema as follows (I would use scala):
import pyspark.sql.types as types
yourSchema = (types.StructType()
.add('field0', types.IntegerType(), True)
# all your .add(fieldsName, fieldsType, True which let your field be nullable)
.add('corrupted_records', types.StringType(), True) #your corrupted date will be here
)
With this having been defined, you can import your csv file into a DataFrame as follows:
df = ( spark.read.format("csv")
.schema(yourSchema)
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "corrupted_records")
load(your_csv_files)
)
There are also other ways to do the same operation, and different modalities to handle bad bad; have a look at this insightful article: https://python.plainenglish.io/how-to-handle-bad-data-in-spark-sql-5e0276d37ca1
Upvotes: 0
Reputation: 666
a demo of read json code snippet
df= self.spark.read.option("mode", "PERMISSIVE").option("primitivesAsString", True).json(self.src_path)
Upvotes: 1
Reputation: 184
The following documentation might help. It would be great if you atleast provide the code you've written. https://docs.databricks.com/spark/latest/data-sources/read-csv.html
Upvotes: 1