hbit
hbit

Reputation: 969

Pipeline generation - passing in simple datastructures like lists/arrays

For a code repository project in Palantir Foundry, I am struggling with re-using some of my transformation logic.

It seems almost trivial, but: is there way to send an Input to a Transform that is not a dataset/dataframe reference?

In my case I want to pass in strings or lists/arrays.

This is my code:

from pyspark.sql import functions as F
from transforms.api import Transform, Input, Output


def my_computation(result, customFilter, scope, my_categories, my_mappings):
    scope_df = scope.dataframe()
    my_categories_df = my_categories.dataframe()
    my_mappings_df = my_mappings.dataframe()
 
    filtered_cat_df = (
        my_categories_df
        .filter(F.col('CAT_NAME').isin(customFilter))
    )
 
    # ... more logic
 
 
def generateTransforms(config):
    transforms = []
 
    for key, value in config.items():
        o = {}
        for outKey, outValue in value['outputs'].items():
            o[outKey] = Output(outValue)
 
        i = {}
        for inpKey, inpValue in value['inputs'].items():
            i[inpKey] = Input(inpValue)
 
        i['customFilter'] = Input(value['my_custom_filter'])
 
        transforms.append(Transform(my_computation, inputs=i, outputs=o))
 
    return transforms
 
 
config = {
    "transform_one": {
        "my_custom_filter": {
            "foo",
            "bar"
        },
        "inputs": {
            "scope": "/my-project/input/scope",
            "my_categories": "/my-project/input/my_categories",
            "my_mappings": "/my-project/input/my_mappings"
        },
        "outputs": {
            "result": "/my-project/output/result"
        }
    }
}
 
TRANSFORMS = generateTransforms(config)

The concrete question is: how can I send in the values from my_custom_filter into customFilter in the transformation function my_computation?

If I execute it like above, I get the error "TypeError: unhashable type: 'set'"

Upvotes: 0

Views: 139

Answers (1)

fmsf
fmsf

Reputation: 37137

This looks like a python issue, any chance you can point out which line is causing the error?

Reading throung your code, I would guess it's this line:

i['customFilter'] = Input(value['my_custom_filter'])

Your python logic is wrong, if we unpack your code you're trying to do this call:

i['customFilter'] = Input({"foo", "bar"})

Edit to answer the comment on how to create a python transform to lock a variable in a closure:

def create_transform(inputs={}, outputs={}, my_other_var):
    @transform(**inputs, **outputs)
    def compute(input_foo, input_bar, output_foobar, ctx):
         df = input_foo.dataframe()
         df = df.withColumn("mycol", F.lit(my_other_var))
         output_foorbar.write_dataframe(df)

    return compute

and now you can call this:

 transforms.append(create_tranform(inputs, outptus, "foobar"))

Upvotes: 1

Related Questions