Paolo
Paolo

Reputation: 1

Palantir Foundry repository pyspark @transform problem

i have a problem with a pyspark foundry repository transform. Until now I have been using for what I needed an @transform incremental that allowed me to queue a db each month to a historical "_h" table by adding the month in question. however, I would now like to do one more thing, that is, if the “reference_dt” of the month's table, already exists in the historical table (example re-transform of a month), delete the data historicized to that date and queue the new data by replacing it.

To do this, i thinked to use a normal transoform


@transform (
    Output_h = Output(table_h),
    my_input = Input(table)       
)
def my_compute_function(my_input, Output_h):

    df = my_input.dataframe()

    distinct_year = df.select([F.max(“reference_dt”)]].distinct().collect()[0][0] 

    output_prov = output_h.dataframe()
    output_prov2 = output_prov.filter(F.col(“reference_dt”) !=     distinct_year)

    if output_prov2.rdd.isEmpty():
        schema = output_prov.schema
        output_prov2 = output_prov.limit(0)
    else:
        output_prov2 = output_prov2

    output_new = output_prov2.union(df)

    Output_h.write_dataframe.mode(“overwrite”)(output_new)

premised that the “reference_dt” is always unique in the "my_input" table, so the distinct returns only one value, the problem it gives me is an unrecognized empty schema error, (can't tell which tables). i think because the historical _h table has only one month and i'm trying to overwrite it with itself. where are the problems? could you help me please? sorry if i miss something, i will add some specs if is recommended thanks

Upvotes: 0

Views: 95

Answers (1)

ZettaP
ZettaP

Reputation: 1379

To what I understand, you want to have one dataset that will contain the "current month" and to refresh this every month ? Is that correct ?

If so, you can have this dataset as part of your output of this transform. You can't have a dataset in your output AND in your inputs, at the same time, as this constitutes a cycle and this is not allowed.

However, you can have a dataset as an output, which you can read from. Hence, effectively achievement the same but without a cycle strictly speaking.

You have a bunch of examples there: https://www.palantir.com/docs/foundry/transforms-python/incremental-examples

The typical code is :

@incremental()
@transform(
    students=Input('/examples/students_hair_eye_color'),
    processed=Output('/examples/hair_eye_color_processed')
)
def incremental_filter(students, processed):
    new_students_df = students.dataframe()
    current_output_df = processed.dataframe()

    # Do something with your current output, as it were an input

    # If you want the output to be replaced on each run set the write mode as "replace", if you want to append only, set the mode to "modify".
    processed.set_mode("replace")
    processed.write_dataset(...)

Upvotes: 0

Related Questions