Reputation: 11
Validate_shema(df, dic)
Df2=df.withcolumn('typ_freq',when(df.schema.["Frequency"].dataType != dic["Frequency"], False). Otherwise ('true')
Df2=df.withcolumn('typ_region',when(df.schema.["Region"].dataType != dic["Region"], False). Otherwise ('true')
Df2.show()
It's giving me error - condition must be a column.
Although,when I tried valiadating the length - like - df.withcolumn("len_freq",when(length(df["Freq"]) > dic["Freq"], False).otherwise(True) This worked successfully.
Can anyone tell the solution why the datatype one is not working?
Upvotes: 1
Views: 9760
Reputation: 2742
For schema validation in spark, I would recommend the Cerberus library (https://docs.python-cerberus.org/en/stable/) - there's a great tutorial on utilizing Cerberus with Spark: https://www.waitingforcode.com/apache-spark/validating-json-apache-spark-cerberus/read
In terms of why the current solution isn't working, you'll need to convert the condition to deal with column types, probably using the lit
function (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit) - something like:
import pyspark.sql.functions as F
df = df.withColumn("data_type", F.lit(df.schema.["Frequency"].dataType))
df = df.withcolumn('typ_freq',F.when(F.col("data_type") != dic["Frequency"], False).otherwise('true')
Good luck!
Upvotes: 3