Reputation: 470
The documentation for the Scala_Spark_DataFrameReader_csv suggests that spark can log the malformed rows detected while reading a .csv file.
- How can one log the malformed rows?
- Can one obtain a val or var containing the malformed rows?
The option from the linked documentation is: maxMalformedLogPerPartition (default 10): sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored
Upvotes: 6
Views: 2815
Reputation: 321
I've expanded on klucar's answer here by loading the csv, making a schema from the non-corrupted records, adding the corrupted record column, using the new schema to load the csv and then looking for corrupted records.
from pyspark.sql.types import StructField, StringType
from pyspark.sql.functions import col
file_path = "/path/to/file"
mode = "PERMISSIVE"
schema = spark.read.options(mode=mode).csv(file_path).schema
schema = schema.add(StructField("_corrupt_record", StringType(), True))
df = spark.read.options(mode=mode).schema(schema).csv(file_path)
df.cache()
df.count()
df.filter(col("_corrupt_record").isNotNull()).show()
Upvotes: 0
Reputation: 313
Based on this databricks example you need to explicitly add the "_corrupt_record" column to a schema definition when you read in the file. Something like this worked for me in pyspark 2.4.4:
from pyspark.sql.types import *
my_schema = StructType([
StructField("field1", StringType(), True),
...
StructField("_corrupt_record", StringType(), True)
])
my_data = spark.read.format("csv")\
.option("path", "/path/to/file.csv")\
.schema(my_schema)
.load()
my_data.count() # force reading the csv
corrupt_lines = my_data.filter("_corrupt_record is not NULL")
corrupt_lines.take(5)
Upvotes: 2
Reputation: 5440
If you are using the spark 2.3 check the _corrupt_error special column ... according to several spark discussions "it should work " , so after the read filter those which non-empty cols - there should be your errors ... you could check also the input_file_name() sql func
if you are not using lower than version 2.3 you should implement a custom read , record solution, because according to my tests the _corrupt_error does not work for csv data source ...
Upvotes: 0