Reputation: 33
I am using pyspark to load the data from csv file into a dataframe and I was able to load the data while dropping the malformed records but how can I reject these bad (malformed) records from the csv file and save these rejected records in a new file?
Upvotes: 3
Views: 7801
Reputation: 9701
Here is an idea, although I am not very happy about it. The CSV parser has different modes, as you know, to drop malformed data. However, if no mode is specified, it 'fills the blanks' with a default null
value. You can use that to your advantage.
Using this data, and assuming that the column article_id
is not nullable by design:
1,abcd,correct record1,description1 haha
Bad record,Bad record description
3,hijk,another correct record,description2
Not_An_Integer,article,no integer type,description
Here is the code:
#!/usr/bin/env python
# coding: utf-8
import pyspark
from pyspark.sql.types import *
from pyspark.sql import Row, functions as F
sc = pyspark.SparkContext.getOrCreate()
spark = pyspark.sql.SparkSession(sc)
# Load the data with your schema, drop the malformed information
schema = StructType([ StructField("article_id", IntegerType()),
StructField("title", StringType()),
StructField("short_desc", StringType()),
StructField("article_desc", StringType())])
valid_data = spark.read.format("csv").schema(schema).option("mode","DROPMALFORMED").load("./data.csv")
valid_data.show()
"""
+----------+-----+--------------------+-----------------+
|article_id|title| short_desc| article_desc|
+----------+-----+--------------------+-----------------+
| 1| abcd| correct record1|description1 haha|
| 3| hijk|another correct r...| description2|
+----------+-----+--------------------+-----------------+
"""
# Load the data and let spark infer everything
malformed_data = spark.read.format("csv").option("header", "false").load("./data.csv")
malformed_data.show()
"""
+--------------+--------------------+--------------------+-----------------+
| _c0| _c1| _c2| _c3|
+--------------+--------------------+--------------------+-----------------+
| 1| abcd| correct record1|description1 haha|
| Bad record|Bad record descri...| null| null|
| 3| hijk|another correct r...| description2|
|Not_An_Integer| article| no integer type| description|
+--------------+--------------------+--------------------+-----------------+
"""
# Join and keep all data from the 'malformed' DataFrame.
merged = valid_data.join(malformed_data, on=valid_data.article_id == malformed_data._c0, how="right")
# Filter those records for which a matching with the 'valid' data was not possible
malformed = merged.where(F.isnull(merged.article_id))
malformed.show()
"""
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
|article_id|title|short_desc|article_desc| _c0| _c1| _c2| _c3|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
| null| null| null| null| Bad record|Bad record descri...| null| null|
| null| null| null| null|Not_An_Integer| article|no integer type|description|
+----------+-----+----------+------------+--------------+--------------------+---------------+-----------+
"""
I am not too fond of this, as it is very sensitive to how Spark parses the CSV and it might not work for all files, but you might find it useful.
Upvotes: 3