Khyati Wahi
Khyati Wahi

Reputation: 11

Schema validation in spark using python

Validate_shema(df, dic)
   Df2=df.withcolumn('typ_freq',when(df.schema.["Frequency"].dataType != dic["Frequency"], False). Otherwise ('true')
   Df2=df.withcolumn('typ_region',when(df.schema.["Region"].dataType != dic["Region"], False). Otherwise ('true')

Df2.show()

It's giving me error - condition must be a column.

Although,when I tried valiadating the length - like - df.withcolumn("len_freq",when(length(df["Freq"]) > dic["Freq"], False).otherwise(True) This worked successfully.

Can anyone tell the solution why the datatype one is not working?

Upvotes: 1

Views: 9760

Answers (1)

danielcahall
danielcahall

Reputation: 2742

For schema validation in spark, I would recommend the Cerberus library (https://docs.python-cerberus.org/en/stable/) - there's a great tutorial on utilizing Cerberus with Spark: https://www.waitingforcode.com/apache-spark/validating-json-apache-spark-cerberus/read

In terms of why the current solution isn't working, you'll need to convert the condition to deal with column types, probably using the lit function (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit) - something like:

import pyspark.sql.functions as F
df = df.withColumn("data_type", F.lit(df.schema.["Frequency"].dataType))
df = df.withcolumn('typ_freq',F.when(F.col("data_type") != dic["Frequency"], False).otherwise('true')

Good luck!

Upvotes: 3

Related Questions