Reputation: 671
Situation: I have monthly snapshots that should look like this
snapshot-2021-10.parquet
snapshot-2021-11.parquet
snapshot-2021-12.parquet
snapshot-2022-01.parquet
snapshot-2022-02.parquet
In the processing, i need the last n (say: 3) before a given date. So if date is 2022-01 I would need to process 2021-11, 2021-12 and 2022-01
Imagine the processing node wrapping a function
def process(snapshots: List[pd.DataFrame]) -> pd.Dataframe:
return pd.concat(snapshots).groupby("id")["value"].sum().reset_index()
Question:
How to set up the Node, pipeline and the Data catalog entry for this? Goal is to be able to just call kedro run --pipeline processing --params yearmon:2022-01
What I considered:
Upvotes: 0
Views: 628
Reputation: 1516
I think you're looking for PartitionedDataSet or IncrementalDataSet you get a dictionary of IDs to lazy load()
methods that you can use like this.
Also I'm pretty sure Spark or Dask allow you to do wildcards here i.e. snapshot-*.parquet
.
Upvotes: 1