Reputation: 397
I run a simple test dataflow pipeline in GCP using DirectRunner and it is ok. But when I try with DataFlowRunner, I believe that the worker try to install apache-beam and it cannot due to proxy access. My goal is to run the pipeline on Google Dataflow. I had the impression that DataFlow should take care of having apache-beam pre-installed on worker. Shall I use a container with all requirements? I appreciate any suggestions.
python3 pipeline.py \
--project='....' \
--region='....' \
--service_account_email='....' \
--dataset_id='dset_name' \
--ingest_table_name='table_name' \
--storage_bucket='....' \
--temp_location='....' \
--runner=DataFlowRunner
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/opt/conda/bin/python3', '-m', 'pip', 'download', '--dest', '/var/tmp/tmp3ajs7rn0', 'apache-beam==2.45.0', '--no-deps', '--no-binary', ':all:']
WARNING: Retrying (Retry(total=4, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProxyError('Cannot connect to proxy.', NewConnectionError('<pip._vendor.urllib3.connection.HTTPSConnection object at 0x7f4f67e52ed0>: Failed to establish a new connection: [Errno -2] Name or service not known'))': /simple/apache-beam/
......
ERROR: Could not find a version that satisfies the requirement apache-beam==2.45.0 (from versions: none)
ERROR: No matching distribution found for apache-beam==2.45.0
==== update === I tried to use a docker image:
options.view_as(WorkerOptions).worker_harness_container_image = '...apache/beam_python3.7_sdk'
and I get the error: "Pip install failed for package: apache-beam==2.45.0
Output from execution of subprocess: b"Collecting apache-beam==2.45.0\n Using cached apache-beam-2.45.0.zip (3.0 MB)\n Preparing metadata (setup.py): started\n Preparing metadata (setup.py): finished with status 'error'\n"
So it will be helpful to know the correct syntax for using a container image with DataFlow pipelines.
==update==
I created a virtual env and installed wheel and apache-beam[gcp]
Now I receive a different and very strange error. No matter which unique name I use for the job_name, the error is:
"apache_beam.runners.dataflow.internal.apiclient.DataflowJobAlreadyExistsError: There is already active job named pipeline_unique_name with id: ppline_unique_id. If you want to submit a second job, try again by setting a different name using --job_name.
"
Upvotes: 0
Views: 961
Reputation: 397
I solved the problem. When using "With" statement, the pipeline.run() command is not needed.
Upvotes: 0
Reputation: 64
In the versions before Beam 2.50.0, When DataflowRunner is used, Apache Beam Python SDK before launching the dataflow pipeline, downloads the apache-beam
package from pypi on to your local and then stages it on the to the staging location provided by you.
For DataflowRunner v2 this is not needed because the default container the dataflow uses contains the apache_beam installation but the DataflowRunner v1 needs the staged packaged because apache_beam ins not installed on the default container that DataflowRunner v1 uses.
With Beam 2.50.0, Runner v1 is deprecated.
So for your case, I would suggest upgrade your beam version to 2.50.0 or use DataflowRunner v2 by passing --experiments=use_runner_v2 --sdk_location=container
.
If you are already using runner v2 or runner v2 is auto enabled for you, you can just pass --sdk_location=container
Upvotes: 0
Reputation: 1118
--runner=DataFlowRunner
explicitly specifies to run the code on the Google Cloud Platform,
To run locally, try removing this parameter
python -m apache_beam.examples.windowed_wordcount --input YOUR_INPUT_FILE --output_table PROJECT:DATASET.TABLE
If you want to run the pipeline on Dataflow, you need to set up your environment
Get the correct version of python
, pip
. I suggest you to use a virtualenv
.
It's also important to install wheel
then get apache-beam
along with [gcp]
.
pip install wheel
pip install 'apache-beam[gcp]'
When everything is set up, try running the example in Dataflow using the following :
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
Upvotes: 0