Reputation: 519
I'm trying to use pyspark csv reader with the following criteria:
Here is what I have tried.
file: ab.csv
------
a,b
1,2
3,four
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
DDL = "a INTEGER, b INTEGER"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=False,
columnNameOfCorruptRecord='broken')
print(df.show())
Output:
+----+----+
| a| b|
+----+----+
| 1| 2|
|null|null|
+----+----+
This command does not store the corrupted records. If I add broken
to
the schema and remove header validation the command
works with a warning.
DDL = "a INTEGER, b INTEGER, broken STRING"
df = spark.read.csv('ab.csv', header=True, schema=DDL, enforceSchema=True,
columnNameOfCorruptRecord='broken')
print(df.show())
Output:
WARN CSVDataSource:66 - Number of column in CSV header is not equal to number of fields in the schema:
Header length: 2, schema size: 3
CSV file: file:/// ... /ab.csv
+----+----+------+
| a| b|broken|
+----+----+------+
| 1| 2| null|
|null|null|3,four|
+----+----+------+
Is this intended behavior or is there a bug that breaks the first example? Is there a better way to do this?
One more thing. I want to process well-formed fields in corrupted records to get a dataframe like this.
+--+----+------+
| a| b|broken|
+--+----+------+
| 1| 2| null|
| 3|null|3,four|
+--+----+------+
Should I make an extra step post reading to get that, or is there some option I have missed to be more permissive.
Upvotes: 13
Views: 13750
Reputation: 309
My team was struggling with the same issue so we checked the file and the format error was because there are some text with \n inside the data. However, it should be in \ " . So, when we checked the file using Atom we noticed the problem and then the solution was just force the reader to handle multi-lines.
df = spark.read.options(header=True, sep='|').csv("my_csv_file", multiLine=True)
Upvotes: 1
Reputation: 351
Like @deo said, when using columnNameOfCorruptRecord
, Spark will implicitly create the column before dropping it during parsing. In order to keep the column, you need to explicitly add it to your schema. Note that this behavior also depends on what mode
you specify when reading.
See this snippet for the mode
param in the Spark documentation:
PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.
Upvotes: 2
Reputation: 936
That's the correct default behavior. If you are inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema, otherwise you have to provide a string type field named columnNameOfCorruptRecord in an user-defined schema or change the column name like broken and add the same name to the schema.
There is no option process the data partially as you mentioned, for that to happen you need to write your own custom parser extending CSVFileFormat in spark. For list of all csvoptions, check org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
Upvotes: 1