Reputation: 16856
How can I force a particular dataset to build non-incrementally without changing the semantic version in the transforms repo?
Details about our specific use case:
We have about 50 datasets defined by a single incremental python via manual registration and a for-loop. The input to this transform can be between 100's and 10000's of small gzip files, so when the larger dataset runs, it ends up partitioning all of these into only a handful of well-sized parquet files, which is perfect for our downstream jobs. However, after this job has been running incrementally for months (with files arriving every hour), there will also be a large number of small parquet files in the output. We'd like to be able to force a snapshot build of this single dataset without having to bump the semantic version of the transform which would trigger snapshot builds for all 50 datasets. Is this possible?
I understand a potential workaround could be defining a "max output files" in the transform itself, reading the current number of files in the existing output, and forcing a snapshot if the current exceeds the maximum. However, since this pipeline is time sensitive (needs to run in under an hour), this would introduce a level of unpredictability to the pipeline since the snapshot build takes much longer. We'd like to be able to set these full snapshot builds to run about once a month on a weekend.
Upvotes: 2
Views: 1195
Reputation: 91
I think you could
for the input:
input = input.dataframe('current')
for the output:
output.set_mode('replace')
Upvotes: 0
Reputation: 967
My preferred approach to do this these days, is to use what I call a "snapshot dataset". This approach allows you to inject a snapshot transaction into your pipeline at any arbitrary point, as well as schedule snapshot builds at regular intervals, which can be very useful for keeping long-lived low-latency pipelines performant.
For this, I use a wrapper when declaring my transforms (java transforms, in my case, but it applies similarly to python) which adds an additional input to my transform.
Let's say you start with a transform that reads datasets A
and B
and produces dataset C
. The wrapper will insert an additional dataset as input called CSnapshotDataset
, as well as generate a transform that produces this (empty) dataset.
The automatically generated transform that produces CSnapshotDataset
will always put an empty SNAPSHOT
transaction into the dataset whenever it is build. When there's a new snapshot transaction coming from CSnapshotDataset
, your transform will output a snapshot transaction as well.
To then snapshot your pipeline from a given point onwards, for instance from and including dataset C
, you simply select C
s snapshot dataset (CSnapshotDataset
in this case) and build it. The next (scheduled) run of the pipeline will snapshot C
and everything downstream from it.
To run this on a regular interval, you can then set a schedule to build CSnapshotDataset
.
I apply this wrapper generously (generally to any transform I write) which gives me the flexibility to snapshot the pipeline from any dataset that might need it.
While it's a little more up-front work to set this up, the major advantages with this are:
curl
callsUpvotes: 2
Reputation: 110
I think you simply decide at run time whether to use TransformOutput.set_mode()
in your output to replace
or modify
.
This way, you could decide based on the sizing of your inputs if you'd like to overwrite or append to the output
Upvotes: 0