Reputation:
I am trying to build an Apache Beam pipeline in Python 3.7 with beam sdk version 2.20.0, the pipeline gets deployed on Dataflow successfully but does not seem to be doing anything. In the worker logs, I can see the following error message repeatedly reported
Error syncing pod xxxxxxxxxxx (), skipping: Failed to start container worker log
I have tried everything I could but this error is quite stubborn, my pipeline looks like this.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import DebugOptions
options = PipelineOptions()
options.view_as(GoogleCloudOptions).project = PROJECT
options.view_as(GoogleCloudOptions).job_name = job_name
options.view_as(GoogleCloudOptions).region = region
options.view_as(GoogleCloudOptions).staging_location = staging_location
options.view_as(GoogleCloudOptions).temp_location = temp_location
options.view_as(WorkerOptions).zone = zone
options.view_as(WorkerOptions).network = network
options.view_as(WorkerOptions).subnetwork = sub_network
options.view_as(WorkerOptions).use_public_ips = False
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
options.view_as(SetupOptions).sdk_location = ''
options.view_as(SetupOptions).save_main_session = True
options.view_as(DebugOptions).experiments = []
print('running pipeline...')
with beam.Pipeline(options=options) as pipeline:
(
pipeline
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(topic=topic_name).with_output_types(bytes)
| 'ProcessMessage' >> beam.ParDo(Split())
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_table_name,
schema=bq_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
result = pipeline.run()
I have tried supplying a beam sdk 2.20.0.tar.gz from the compute instance using sdk_location parameter, that doesn't work either. I can't use sdk_location = default as that triggers a download from pypi.org. I am working in an offline environment and connectivity to internet is not an option. Any help would be highly appreciated.
The pipeline itself is deployed on a container and all libraries that go with apache beam 2.20.0 are specified in a requirements.txt file, docker image installs all the libraries.
Upvotes: 1
Views: 1688
Reputation: 41
TL;DR : Copy the Apache Beam SDK Archive into an accessible path and provide the path as a variable.
I was also struggling with this setup. Finally I found a solution - even if your question was raised quite some days ago, this answer might help someone else.
There are probably multiple ways to do that, but the following two are quite simple.
As a precondition you'll need to create the apache-beam-sdk source archive as following:
Clone Apache Beam GitHub
Switch to required tag eg. v2.28.0
cd to beam/sdks/python
Create tar.gz source archive of your required beam_sdk version like following:
python setup.py sdist
Now you should have the source archive apache-beam-2.28.0.tar.gz
in the path beam/sdks/python/dist/
Option 1 - Use Flex templates and copy Apache_Beam_SDK in Dockerfile
Documentation : Google Dataflow Documentation
COPY utils/apache-beam-2.28.0.tar.gz /tmp
, because this is going to be the path you can set in your SetupOptions.FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
# Due to a change in the Apache Beam base image in version 2.24, you must to install
# libffi-dev manually as a dependency. For more information:
# https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
# update used packages
RUN apt-get update && apt-get install -y \
libffi-dev \
&& rm -rf /var/lib/apt/lists/*
COPY setup.py .
COPY main.py .
COPY path_to_beam_archive/apache-beam-2.28.0.tar.gz /tmp
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/setup.py"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/main.py"
RUN python -m pip install --user --upgrade pip setuptools wheel
options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
gcloud builds submit --tag $TEMPLATE_IMAGE .
gcloud dataflow flex-template build "gs://define-path-to-your-templates/your-flex-template-name.json" \
--image=gcr.io/your-project-id/image-name:tag \
--sdk-language=PYTHON \
--metadata-file=metadata.json
gcloud dataflow flex-template run "your-dataflow-job-name" \
--template-file-gcs-location="gs://define-path-to-your-templates/your-flex-template-name.json" \
--parameters staging_location="gs://your-bucket-path/staging/" \
--parameters temp_location="gs://your-bucket-path/temp/" \
--service-account-email="your-restricted-sa-dataflow@your-project-id.iam.gserviceaccount.com" \
--region="yourRegion" \
--max-workers=6 \
--subnetwork="https://www.googleapis.com/compute/v1/projects/your-project-id/regions/your-region/subnetworks/your-subnetwork" \
--disable-public-ips
Option 2 - Copy sdk_location from GCS
According Beam documentation you should be able to even provide directly a GCS / gs:// path for the Option sdk_location
, but it didn't work for me. But the following should work:
gs://yourbucketname/beam_sdks/apache-beam-2.28.0.tar.gz
/tmp/apache-beam-2.28.0.tar.gz
# see: https://cloud.google.com/storage/docs/samples/storage-download-file
from google.cloud import storage
def download_blob(bucket_name, source_blob_name, destination_file_name):
"""Downloads a blob from the bucket."""
# bucket_name = "your-bucket-name"
# source_blob_name = "storage-object-name"
# destination_file_name = "local/path/to/file"
storage_client = storage.Client()
bucket = storage_client.bucket("gs://your-bucket-name")
# Construct a client side representation of a blob.
# Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
# any content from Google Cloud Storage. As we don't need additional data,
# using `Bucket.blob` is preferred here.
blob = bucket.blob("gs://your-bucket-name/path/apache-beam-2.28.0.tar.gz")
blob.download_to_filename("/tmp/apache-beam-2.28.0.tar.gz")
options.view_as(SetupOptions).sdk_location = '/tmp/apache-beam-2.28.0.tar.gz'
Upvotes: 1