domdomegg
domdomegg

Reputation: 1806

How do I check a column always has the same value in Palantir Foundry?

I have a dataset with a column that I always expect to have the same value. It indicates a data schema version, so I know I am deserialising my data correctly. How can I ensure I am alerted if there is a value with a different schema version?

data version
{"key":"value"} 1
{"key":"value2"} 1

(if there is a row where version != 1, I want to alert)

Upvotes: 0

Views: 272

Answers (2)

domdomegg
domdomegg

Reputation: 1806

You can do this by using the Spark assert_true function. If you use this function on a dropped column though, Spark will 'optimise' away the expectation so coalesce it with a column that contributes to the output.

For example:

from transforms.api import transform_df, Input, Output
from pyspark.sql import functions as F, types as T


@transform_df(
    Output("/path/to/output"),
    source_df=Input("/path/to/input"),
)
def compute(source_df):
    return (
        source_df
        # assert the version is consistent
        .withColumn("data", F.coalesce(F.expr("assert_true(version = '1')"), "data"))
        .drop("version")
        # parse the data, knowing we can expect the correct schema
        .withColumn("data", F.from_json(F.col("data"), T.StructType([
            T.StructField("key", T.StringType())
        ]))
    )

Upvotes: 0

domdomegg
domdomegg

Reputation: 1806

You can do this using data expectations, if validating the version at the end or beginning of a transform is suitable for your needs. For example:

from transforms.api import transform_df, Input, Output, Check
from pyspark.sql import functions as F, types as T
from transforms import expectations as E


@transform_df(
    Output("/path/to/output"),
    source_df=Input("/path/to/input", checks=[
        # assert the version is consistent
        # usually you'd put checks on the output dataset
        # so this might be better placed where the input is created
        Check(E.col("version").equals(1), "version: equals 1")
    ]),
)
def compute(source_df):
    return (
        source_df
        .drop("version")
        # parse the data, knowing we can expect the correct schema
        .withColumn("data", F.from_json(F.col("data"), T.StructType([
            T.StructField("key", T.StringType())
        ]))
    )

Upvotes: 0

Related Questions