raj
raj

Reputation: 1

How to run some code at the end of pipeline

I have a requirement to update two tables at the end of the dataflow job pipeline(based on pipeline result).

I have put my logic to update to tables in finally block.

Here finally block is executing in parallel to the pipeline

Is there a way where i can run the code at end of pipeline ?

Upvotes: 0

Views: 425

Answers (2)

zikzakjack
zikzakjack

Reputation: 1092

I think that the answer suggested by @robertwb to use waitUntilFinish() may not work in GCP. Refer to the link and explanation from Google Cloud Docs. However, We can try to achieve the final step as part of the pipeline itself by using Combine.globally() as the last step in the pipeline.

// sample pipeline

            pipeline.apply("step_1", ParDo.of(step_1()))
                .apply("step_2", ParDo.of(step_2()))
                .apply("step_3", ParDo.of(step_3()))
                .apply("step_4_wait_until_finish", combineGlobally())
                .apply("step_5_final", ParDo.of(new Step_5_final()));

// combineGlobally

    static Globally<TableRow, TableRow> combineGlobally() {
    return Combine.globally(new SerializableFunction<Iterable<TableRow>, TableRow>() {
        @Override
        public TableRow apply(Iterable<TableRow> input) {
            return new TableRow();
        }
    }).withoutDefaults();
}

// Step_5_final

    static class Step_5_final extends DoFn<TableRow, String> {
    @ProcessElement
    public void onElement(@Element final TableRow dataIn, final OutputReceiver<String> out) throws Exception {
        BigQuery.execute(update_query_1);
        BigQuery.execute(update_query_2);
        out.output("SUCCESS");
    }
}

Refer to this link https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates#limitations

The following limitations apply to Flex Templates jobs in GCP:

  • The program that constructs the pipeline must exit after run is called in order for the pipeline to start.
  • waitUntilFinish (Java) and wait_until_finish (Python) are not supported.

Upvotes: 1

robertwb
robertwb

Reputation: 5104

The PipelineResult object should have a waitUntilFinish operation that you can call from within your try block to ensure the pipeline completes before the finally block executes.

Upvotes: 0

Related Questions