crbl
crbl

Reputation: 397

Unable to run dataflow pipeline in DataflowRunner mode

I run a simple test dataflow pipeline in GCP using DirectRunner and it is ok. But when I try with DataFlowRunner, I believe that the worker try to install apache-beam and it cannot due to proxy access. My goal is to run the pipeline on Google Dataflow. I had the impression that DataFlow should take care of having apache-beam pre-installed on worker. Shall I use a container with all requirements? I appreciate any suggestions.

python3 pipeline.py \
  --project='....' \
  --region='....' \
  --service_account_email='....' \
  --dataset_id='dset_name' \
  --ingest_table_name='table_name' \
  --storage_bucket='....' \
  --temp_location='....' \
  --runner=DataFlowRunner

INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/opt/conda/bin/python3', '-m', 'pip', 'download', '--dest', '/var/tmp/tmp3ajs7rn0', 'apache-beam==2.45.0', '--no-deps', '--no-binary', ':all:']
WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProxyError('Cannot connect to proxy.', NewConnectionError('<pip._vendor.urllib3.connection.HTTPSConnection object at 0x7f4f67e52ed0>: Failed to establish a new connection: [Errno -2] Name or service not known'))': /simple/apache-beam/
......
ERROR: Could not find a version that satisfies the requirement apache-beam==2.45.0 (from versions: none)
ERROR: No matching distribution found for apache-beam==2.45.0

==== update === I tried to use a docker image:

options.view_as(WorkerOptions).worker_harness_container_image = '...apache/beam_python3.7_sdk'

and I get the error: "Pip install failed for package: apache-beam==2.45.0
Output from execution of subprocess: b"Collecting apache-beam==2.45.0\n Using cached apache-beam-2.45.0.zip (3.0 MB)\n Preparing metadata (setup.py): started\n Preparing metadata (setup.py): finished with status 'error'\n"

So it will be helpful to know the correct syntax for using a container image with DataFlow pipelines.

==update== I created a virtual env and installed wheel and apache-beam[gcp] Now I receive a different and very strange error. No matter which unique name I use for the job_name, the error is: "apache_beam.runners.dataflow.internal.apiclient.DataflowJobAlreadyExistsError: There is already active job named pipeline_unique_name with id: ppline_unique_id. If you want to submit a second job, try again by setting a different name using --job_name."

Upvotes: 0

Views: 961

Answers (3)

crbl
crbl

Reputation: 397

I solved the problem. When using "With" statement, the pipeline.run() command is not needed.

Upvotes: 0

AInguva
AInguva

Reputation: 64

In the versions before Beam 2.50.0, When DataflowRunner is used, Apache Beam Python SDK before launching the dataflow pipeline, downloads the apache-beam package from pypi on to your local and then stages it on the to the staging location provided by you.

For DataflowRunner v2 this is not needed because the default container the dataflow uses contains the apache_beam installation but the DataflowRunner v1 needs the staged packaged because apache_beam ins not installed on the default container that DataflowRunner v1 uses.

With Beam 2.50.0, Runner v1 is deprecated.

So for your case, I would suggest upgrade your beam version to 2.50.0 or use DataflowRunner v2 by passing --experiments=use_runner_v2 --sdk_location=container.

If you are already using runner v2 or runner v2 is auto enabled for you, you can just pass --sdk_location=container

Upvotes: 0

vdolez
vdolez

Reputation: 1118

--runner=DataFlowRunner explicitly specifies to run the code on the Google Cloud Platform,

To run locally, try removing this parameter

see apache beam's doc

python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE

If you want to run the pipeline on Dataflow, you need to set up your environment

Get the correct version of python, pip. I suggest you to use a virtualenv.

It's also important to install wheel then get apache-beam along with [gcp].

pip install wheel
pip install 'apache-beam[gcp]'

When everything is set up, try running the example in Dataflow using the following :

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

Upvotes: 0

Related Questions