simon_dmorias
simon_dmorias

Reputation: 2483

Validate NULL values from parquet files

I'm reading parquet files from a third party. It seems that parquet always converts the schema of files to nullable columns regardless of how they were written.

When reading these files I would like to reject files that contain a NULL value in a particular column. With csv or json you can do:

schema = StructType([StructField("id", IntegerType(), False), StructField("col1", IntegerType(), False)])
df = spark.read.format("csv").schema(schema).option("mode", "FAILFAST").load(myPath)

And the load will be rejected is it contains a NULL in col1. If you try this in Parquet it will be accepted.

I could do a filter or count on the column for Null values and raise an error - that from a performance stance that is terrible because I will get an extra Stage in the job. It will also reject the complete dataframe and all files (yes the CSV route does this as well).

Is there anyway to enforce validation on the files on read?

I'm using version Spark 3 if it helps.

Edit with example:

from pyspark.sql.types import *
schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("col1", IntegerType(), True)
])
df = spark.createDataFrame([(1,1),(2, None)], schema)
df.write.format("parquet").mode("overwrite").save("/tmp/parquetValidation/")
df2 = spark.read.format("parquet").load("/tmp/parquetValidation/")
df2.printSchema()

Returns

|-- Id: integer (nullable = true) 
|-- col1: integer (nullable = true)

Re-read the file with a schema blocking nulls:

schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("col1", IntegerType(), False)
])
df3 = spark.read.format("parquet").schema(schema).option("mode", "FAILFAST").load("/tmp/parquetValidation/")
df3.printSchema()

Returns:

|-- Id: integer (nullable = true) 
|-- col1: integer (nullable = true)

Ie the schema is not applied.

Upvotes: 0

Views: 3627

Answers (1)

simon_dmorias
simon_dmorias

Reputation: 2483

Thanks to @Sasa in the comments on the question.

from pyspark.sql import DataFrame

schema = StructType([
  StructField("Id", IntegerType(), False),
  StructField("col1", IntegerType(), False)
])

df_junk = spark.read.format("parquet").schema(schema).load("/tmp/parquetValidation/")

new_java_schema = spark._jvm.org.apache.spark.sql.types.DataType.fromJson(schema.json())
java_rdd = df_junk._jdf.toJavaRDD()
new_jdf = spark._jsparkSession.createDataFrame(java_rdd, new_java_schema)
df_validate = DataFrame(new_jdf, df.sql_ctx)

df_validate.printSchema()

Returns

|-- Id: integer (nullable = false)
|-- col1: integer (nullable = false)

And running an action causes:

java.lang.RuntimeException: The 1th field 'col1' of input row cannot be null.

Not nice dropping to a java rdd - but it works

Upvotes: 0

Related Questions