Reputation: 1
i have a problem with a pyspark foundry repository transform. Until now I have been using for what I needed an @transform incremental that allowed me to queue a db each month to a historical "_h" table by adding the month in question. however, I would now like to do one more thing, that is, if the “reference_dt” of the month's table, already exists in the historical table (example re-transform of a month), delete the data historicized to that date and queue the new data by replacing it.
To do this, i thinked to use a normal transoform
@transform (
Output_h = Output(table_h),
my_input = Input(table)
)
def my_compute_function(my_input, Output_h):
df = my_input.dataframe()
distinct_year = df.select([F.max(“reference_dt”)]].distinct().collect()[0][0]
output_prov = output_h.dataframe()
output_prov2 = output_prov.filter(F.col(“reference_dt”) != distinct_year)
if output_prov2.rdd.isEmpty():
schema = output_prov.schema
output_prov2 = output_prov.limit(0)
else:
output_prov2 = output_prov2
output_new = output_prov2.union(df)
Output_h.write_dataframe.mode(“overwrite”)(output_new)
premised that the “reference_dt” is always unique in the "my_input" table, so the distinct returns only one value, the problem it gives me is an unrecognized empty schema error, (can't tell which tables). i think because the historical _h table has only one month and i'm trying to overwrite it with itself. where are the problems? could you help me please? sorry if i miss something, i will add some specs if is recommended thanks
Upvotes: 0
Views: 95
Reputation: 1379
To what I understand, you want to have one dataset that will contain the "current month" and to refresh this every month ? Is that correct ?
If so, you can have this dataset as part of your output of this transform. You can't have a dataset in your output AND in your inputs, at the same time, as this constitutes a cycle and this is not allowed.
However, you can have a dataset as an output, which you can read from. Hence, effectively achievement the same but without a cycle strictly speaking.
You have a bunch of examples there: https://www.palantir.com/docs/foundry/transforms-python/incremental-examples
The typical code is :
@incremental()
@transform(
students=Input('/examples/students_hair_eye_color'),
processed=Output('/examples/hair_eye_color_processed')
)
def incremental_filter(students, processed):
new_students_df = students.dataframe()
current_output_df = processed.dataframe()
# Do something with your current output, as it were an input
# If you want the output to be replaced on each run set the write mode as "replace", if you want to append only, set the mode to "modify".
processed.set_mode("replace")
processed.write_dataset(...)
Upvotes: 0