Reputation: 2751
I have a Dataflow job defined in Apache Beam that works fine normally but breaks when I attempt to include all of my custom command line options in the PipelineOptions
that I pass to beam.Pipeline(options=pipeline_options)
. It fails after the graph is constructed, but before the first step starts, because the worker becomes unresponsive after starting up and eventually the job times out with no useful logs.
I would like to pass my custom options because only the options that you pass directly to the pipeline show up on the right hand side in the Dataflow console UI, and its very handy to be able to see them.
Full broken example is here. The old version that works looked more or less like this
def run():
parser = argparse.ArgumentParser()
# Many parser.add_argument lines
known_args, pipeline_args = parser.parse_known_args()
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Pipeline definition
The code that doesn't work looks like this
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
# same lines of parser.add_argument
def run():
pipeline_options = CustomOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# Same pipeline definition
Here are the extra keys that I end up passing into the PipelineOptions
object.
api_key
dataset_id
date_column
date_grouping_frequency
input_bigquery_sql
input_mode
org_id
output
output_executable_path # This one isn't really me, it just ends up in there
Setting aside that the argparse/PipelineOptions API seems to be based entirely off of side effects, I can't make sense of why this might lead to the job failing to start. My best guess is that one of the options I'm passing through are overwriting/having some unintended side effect on the worker, but I've done this sort of thing before so I know its possible in general to pass options through like this and have the pipeline work.
Can someone spot some issue that might cause the first worker to become unresponsive? Something about the way I'm passing options in seems to be the issue.
Upvotes: 1
Views: 511
Reputation: 6572
I tested with your arguments and Beam version 2.41.0
and Python 3.8.12
:
"api_key": "test",
"dataset_id": "test",
"date_column": "test",
"date_grouping_frequency": "test",
"input_bigquery_sql": "test",
"input_mode": "test",
"org_id": "test",
"output": "test",
"output_executable_path": "test"
In the Beam
options :
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument("--api_key", help="Api key", required=True)
parser.add_argument("--dataset_id", help="dataset ID", required=True)
parser.add_argument("--date_column", help="datdate_column", required=True)
parser.add_argument("--date_grouping_frequency", help="date_grouping_frequency", required=True)
parser.add_argument("--input_bigquery_sql", help="input_bigquery_sql", required=True)
parser.add_argument("--input_mode", help="input_mode", required=True)
parser.add_argument("--org_id", help="org_id", required=True)
parser.add_argument("--output", help="output", required=True)
parser.add_argument("--output_executable_path", help="output_executable_path", required=True)
In the Beam
pipeline :
def run():
custom_pipeline_options = PipelineOptions().view_as(CustomOptions)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
# Get your custom option arguments
custom_pipeline_options.api_key
custom_pipeline_options.dataset_id
......
When the argument output_executable_path
is a part of options, I have the following error :
[2022-11-18, 22:51:38 UTC]
{beam.py:127} WARNING - argparse.ArgumentError: argument --output_executable_path: conflicting option string: --output_executable_path
There is a conflict with an argument used internally on Beam
.
When I remove the argument output_executable_path
from options, the Dataflow
works without issue.
Can you test without this argument please ?
Upvotes: 2