Reputation:
I have a pipeline that I can execute locally without any errors. I used to get this error in my locally run pipeline
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
I believe I fixed this by downgrading to apache-beam=2.3.0 Then locally it would run perfectly.
Now I am using DataflowRunner and in the requirements.txt file I have the following dependencies
apache-beam==2.3.0
google-cloud-bigquery==1.1.0
google-cloud-core==0.28.1
google-cloud-datastore==1.6.0
google-cloud-storage==1.10.0
protobuf==3.5.2.post1
pytz==2013.7
but I get this dreaded error again
'Clients have non-trivial state that is local and unpickleable.'
PicklingError: Pickling client objects is explicitly not supported.
How come it's giving me the error with DataflowRunner but not DirectRunner? shouldn't they be using the same dependencies/environment? Any help would be appreciated.
I had read that this is the way to solve it but when I try it I still get the same error
class MyDoFn(beam.DoFn):
def start_bundle(self, process_context):
self._dsclient = datastore.Client()
def process(self, context, *args, **kwargs):
# do stuff with self._dsclient
from https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191
My previous reference post where I fixed this locally:
Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()
Thanks in advance!
Upvotes: 8
Views: 12663
Reputation: 26
I've had a similar issue when making Dataflow write a bunch of rows to Bigtable. Setting --save-main-session
to False
seems to have solved it.
Upvotes: 0
Reputation: 565
Initializing unpickleable clients in start_bundle
method is a correct approach, and Beam IOs often follow that, see datastoreio.py as an example. Here is a pipeline that does a simple operation with a GCS python client in a DoFn. I ran it on Apache Beam 2.16.0 without issues. If you can still reproduce your issue, please provide additional details.
gcs_client.py file:
import argparse
import logging
import time
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage
class MyDoFn(beam.DoFn):
def start_bundle(self):
self.storage_client = storage.Client()
def process(self, element):
bucket = self.storage_client.get_bucket("existing-gcs-bucket")
blob = bucket.blob(str(int(time.time())))
blob.upload_from_string("payload")
return element
logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()
pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())
p.run().wait_until_finish()
requirements.txt file:
google-cloud-storage==1.23.0
command line:
python -m gcs_client \
--project=insert_your_project \
--runner=DataflowRunner \
--temp_location gs://existing-gcs-bucket/temp/ \
--requirements_file=requirements.txt \
--save_main_session
Upvotes: 10