Reputation: 1806
I have a dataset with a column that I always expect to have the same value. It indicates a data schema version, so I know I am deserialising my data correctly. How can I ensure I am alerted if there is a value with a different schema version?
data | version |
---|---|
{"key":"value"} | 1 |
{"key":"value2"} | 1 |
(if there is a row where version != 1, I want to alert)
Upvotes: 0
Views: 272
Reputation: 1806
You can do this by using the Spark assert_true function. If you use this function on a dropped column though, Spark will 'optimise' away the expectation so coalesce it with a column that contributes to the output.
For example:
from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F, types as T
@transform_df(
Output("/path/to/output"),
source_df=Input("/path/to/input"),
)
def compute(source_df):
return (
source_df
# assert the version is consistent
.withColumn("data", F.coalesce(F.expr("assert_true(version = '1')"), "data"))
.drop("version")
# parse the data, knowing we can expect the correct schema
.withColumn("data", F.from_json(F.col("data"), T.StructType([
T.StructField("key", T.StringType())
]))
)
Upvotes: 0
Reputation: 1806
You can do this using data expectations, if validating the version at the end or beginning of a transform is suitable for your needs. For example:
from transforms.api import transform_df, Input, Output, Check
from pyspark.sql import functions as F, types as T
from transforms import expectations as E
@transform_df(
Output("/path/to/output"),
source_df=Input("/path/to/input", checks=[
# assert the version is consistent
# usually you'd put checks on the output dataset
# so this might be better placed where the input is created
Check(E.col("version").equals(1), "version: equals 1")
]),
)
def compute(source_df):
return (
source_df
.drop("version")
# parse the data, knowing we can expect the correct schema
.withColumn("data", F.from_json(F.col("data"), T.StructType([
T.StructField("key", T.StringType())
]))
)
Upvotes: 0