Reputation: 1
I have a requirement to update two tables at the end of the dataflow job pipeline(based on pipeline result).
I have put my logic to update to tables in finally block.
Here finally block is executing in parallel to the pipeline
Is there a way where i can run the code at end of pipeline ?
Upvotes: 0
Views: 425
Reputation: 1092
I think that the answer suggested by @robertwb to use waitUntilFinish() may not work in GCP. Refer to the link and explanation from Google Cloud Docs. However, We can try to achieve the final step as part of the pipeline itself by using Combine.globally() as the last step in the pipeline.
// sample pipeline
pipeline.apply("step_1", ParDo.of(step_1()))
.apply("step_2", ParDo.of(step_2()))
.apply("step_3", ParDo.of(step_3()))
.apply("step_4_wait_until_finish", combineGlobally())
.apply("step_5_final", ParDo.of(new Step_5_final()));
// combineGlobally
static Globally<TableRow, TableRow> combineGlobally() {
return Combine.globally(new SerializableFunction<Iterable<TableRow>, TableRow>() {
@Override
public TableRow apply(Iterable<TableRow> input) {
return new TableRow();
}
}).withoutDefaults();
}
// Step_5_final
static class Step_5_final extends DoFn<TableRow, String> {
@ProcessElement
public void onElement(@Element final TableRow dataIn, final OutputReceiver<String> out) throws Exception {
BigQuery.execute(update_query_1);
BigQuery.execute(update_query_2);
out.output("SUCCESS");
}
}
Refer to this link https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#limitations
The following limitations apply to Flex Templates jobs in GCP:
Upvotes: 1
Reputation: 5104
The PipelineResult object should have a waitUntilFinish operation that you can call from within your try block to ensure the pipeline completes before the finally block executes.
Upvotes: 0