Sushant Pailwan
Sushant Pailwan

Reputation: 49

Get the column names of malformed records while reading a csv file using pyspark

I am reading a csv file using pyspark with predefined schema.

schema = StructType([
StructField("col1", IntegerType(), True),
StructField("col2", StringType(), True)
StructField("col3", FloatType(), True)
])

df = spark.sqlContext.read
    .schema(schema)
    .option("header",true)
    .option("delimiter", ",")
    .csv(path) 

Now in the csv file, there is float value in col1 and string value in col3. I need to raise an exception and get the names of these columns(col1, col3) because these columns contain the values of different data type than that of defined in schema.

How do I achieve this?

Upvotes: 2

Views: 1915

Answers (1)

Dave
Dave

Reputation: 2059

In pyspark versions >2.2 you can use columnNameOfCorruptRecord with csv:

schema = StructType(
    [
        StructField("col1", IntegerType(), True),
        StructField("col2", StringType(), True),
        StructField("col3", FloatType(), True),
        StructField("corrupted", StringType(), True),
    ]
)

df = spark.sqlContext.read.csv(
    path,
    schema=schema,
    header=True,
    sep=",",
    mode="PERMISSIVE",
    columnNameOfCorruptRecord="corrupted",
).show()

+----+----+----+------------+
|col1|col2|col3|   corrupted|
+----+----+----+------------+
|null|null|null|0.10,123,abc|
+----+----+----+------------+

EDIT: CSV record fields are not independent of one another, so it can't generally be said that one field is corrupt, but others are not. Only the entire record can be corrupt or not corrupt.

For example, suppose we have a comma delimited file with one row and two floating point columns, the Euro values 0,10 and 1,00. The file looks like this:

col1,col2
0,10,1,00

Which field is corrupt?

Upvotes: 2

Related Questions