user9773014
user9773014

Reputation:

Dataflow Error: 'Clients have non-trivial state that is local and unpickleable'

I have a pipeline that I can execute locally without any errors. I used to get this error in my locally run pipeline

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

I believe I fixed this by downgrading to apache-beam=2.3.0 Then locally it would run perfectly.

Now I am using DataflowRunner and in the requirements.txt file I have the following dependencies

    apache-beam==2.3.0
    google-cloud-bigquery==1.1.0
    google-cloud-core==0.28.1
    google-cloud-datastore==1.6.0
    google-cloud-storage==1.10.0
    protobuf==3.5.2.post1
    pytz==2013.7

but I get this dreaded error again

    'Clients have non-trivial state that is local and unpickleable.'
     PicklingError: Pickling client objects is explicitly not supported.

How come it's giving me the error with DataflowRunner but not DirectRunner? shouldn't they be using the same dependencies/environment? Any help would be appreciated.

I had read that this is the way to solve it but when I try it I still get the same error

    class MyDoFn(beam.DoFn):

        def start_bundle(self, process_context):
            self._dsclient = datastore.Client()

        def process(self, context, *args, **kwargs):
        # do stuff with self._dsclient

from https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3191

My previous reference post where I fixed this locally:

Using start_bundle() in apache-beam job not working. Unpickleable storage.Client()

Thanks in advance!

Upvotes: 8

Views: 12663

Answers (2)

ravellin
ravellin

Reputation: 26

I've had a similar issue when making Dataflow write a bunch of rows to Bigtable. Setting --save-main-session to False seems to have solved it.

Upvotes: 0

Valentyn
Valentyn

Reputation: 565

Initializing unpickleable clients in start_bundle method is a correct approach, and Beam IOs often follow that, see datastoreio.py as an example. Here is a pipeline that does a simple operation with a GCS python client in a DoFn. I ran it on Apache Beam 2.16.0 without issues. If you can still reproduce your issue, please provide additional details.

gcs_client.py file:

import argparse
import logging
import time

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from google.cloud import storage

class MyDoFn(beam.DoFn):
  def start_bundle(self):
    self.storage_client = storage.Client()

  def process(self, element):
    bucket = self.storage_client.get_bucket("existing-gcs-bucket")
    blob = bucket.blob(str(int(time.time())))
    blob.upload_from_string("payload")
    return element

logging.getLogger().setLevel(logging.INFO)
_, options = argparse.ArgumentParser().parse_known_args()

pipeline_options = PipelineOptions(options)
p = beam.Pipeline(options=pipeline_options)
_ = p | beam.Create([None]) | beam.ParDo(MyDoFn())

p.run().wait_until_finish()

requirements.txt file:

google-cloud-storage==1.23.0

command line:

python -m gcs_client \
    --project=insert_your_project \
    --runner=DataflowRunner \
    --temp_location gs://existing-gcs-bucket/temp/ \
    --requirements_file=requirements.txt \
    --save_main_session

Upvotes: 10

Related Questions