Anthony Naddeo
Anthony Naddeo

Reputation: 2751

Dataflow Pipeline workers stall when passing extra arguments in PipelineOptions

I have a Dataflow job defined in Apache Beam that works fine normally but breaks when I attempt to include all of my custom command line options in the PipelineOptions that I pass to beam.Pipeline(options=pipeline_options). It fails after the graph is constructed, but before the first step starts, because the worker becomes unresponsive after starting up and eventually the job times out with no useful logs.

I would like to pass my custom options because only the options that you pass directly to the pipeline show up on the right hand side in the Dataflow console UI, and its very handy to be able to see them.

Full broken example is here. The old version that works looked more or less like this

def run():
    parser = argparse.ArgumentParser()
    # Many parser.add_argument lines

    known_args, pipeline_args = parser.parse_known_args()
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        # Pipeline definition

The code that doesn't work looks like this

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        # same lines of parser.add_argument

def run():
    pipeline_options = CustomOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:
        # Same pipeline definition

Here are the extra keys that I end up passing into the PipelineOptions object.

api_key
dataset_id
date_column
date_grouping_frequency
input_bigquery_sql
input_mode
org_id
output
output_executable_path # This one isn't really me, it just ends up in there

Setting aside that the argparse/PipelineOptions API seems to be based entirely off of side effects, I can't make sense of why this might lead to the job failing to start. My best guess is that one of the options I'm passing through are overwriting/having some unintended side effect on the worker, but I've done this sort of thing before so I know its possible in general to pass options through like this and have the pipeline work.

Can someone spot some issue that might cause the first worker to become unresponsive? Something about the way I'm passing options in seems to be the issue.

Upvotes: 1

Views: 511

Answers (1)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I tested with your arguments and Beam version 2.41.0 and Python 3.8.12 :

"api_key": "test",
"dataset_id": "test",
"date_column": "test",
"date_grouping_frequency": "test",
"input_bigquery_sql": "test",
"input_mode": "test",
"org_id": "test",
"output": "test",
"output_executable_path": "test"

In the Beam options :

class CustomOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument("--api_key", help="Api key", required=True)
        parser.add_argument("--dataset_id", help="dataset ID", required=True)

        parser.add_argument("--date_column", help="datdate_column", required=True)
        parser.add_argument("--date_grouping_frequency", help="date_grouping_frequency", required=True)
        parser.add_argument("--input_bigquery_sql", help="input_bigquery_sql", required=True)
        parser.add_argument("--input_mode", help="input_mode", required=True)
        parser.add_argument("--org_id", help="org_id", required=True)
        parser.add_argument("--output", help="output", required=True)
        parser.add_argument("--output_executable_path", help="output_executable_path", required=True)

In the Beam pipeline :

def run():
    custom_pipeline_options = PipelineOptions().view_as(CustomOptions)
    pipeline_options = PipelineOptions()
    
    with beam.Pipeline(options=pipeline_options) as p:
       # Get your custom option arguments
       custom_pipeline_options.api_key
       custom_pipeline_options.dataset_id
       ......

When the argument output_executable_path is a part of options, I have the following error :

[2022-11-18, 22:51:38 UTC] 
{beam.py:127} WARNING - argparse.ArgumentError: argument --output_executable_path: conflicting option string: --output_executable_path

There is a conflict with an argument used internally on Beam.

When I remove the argument output_executable_path from options, the Dataflow works without issue.

Can you test without this argument please ?

Upvotes: 2

Related Questions