dendog
dendog

Reputation: 3338

Importing Google Firestore Python client in Apache beam

I am trying to write a small DoFn to write data from my dataflow pipeline into Cloud Firestore. Locally everything works as expected, but when trying to run on dataflow everything falls apart!

Here is my function:

class FirestoreWriteDoFn(beam.DoFn):
  def __init__(self):
    super(FirestoreWriteDoFn, self).__init__()

  def start_bundle(self):
    import google.cloud 
    self.db = google.cloud.firestore.Client(project='ag-audience')

  def process(self, element):
    fb_data = {
      'topics': element.get('page_keywords').split(','),
      'title': element.get('page_title')
    }
    logging.info('Inserting into Firebase: %s', fb_data)
    fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
    result = fb_doc.create(fb_data)
    yield result

Here is the command with which it is deployed:

$ python pipe/main.py \
  --runner=dataflow \
  --project=ag-audience \
  --region=europe-west1 \
  --machine_type=n1-standard-4 \
  --temp_location=gs://ag-dataflow/tmp \
  --requirements_file requirements.txt \
  --save_main_session \
  --streaming

And here is my requirements.txt:

google-cloud-firestore>=1.3.0

I have tried many things: - Importing the firestore module globally at the top of the file. - Importing it in different ways: import x from y , import y. - Importing it in various parts of the code.

The errors are always that something is undefined: NameError: global name 'google' is not defined [while running 'generatedPtransform-480']

EDIT: (adding the pipeline code)

with beam.Pipeline(argv=pipeline_args) as p:

    crawled_features = (p 
      | 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
      | 'DebugLogInput' >> beam.ParDo(LogResults())
      | 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
    )

    firebase_stream = (crawled_features
      | 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
      | 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
    )

    bigquery_stream = (crawled_features 
      | 'ShapeRow' >> beam.Map(ShapeBQRow)
      | 'LogShapedBQRow' >> beam.ParDo(LogResults())
      | 'WriteBigQuery' >> beam.io.WriteToBigQuery(
        table=BIGQUERY_TABLE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )

Upvotes: 2

Views: 1403

Answers (1)

zdenulo
zdenulo

Reputation: 346

The issue is with Beam version. In 2.13.0 there is probably some bug, but with 2.12.0 it works ok based on Python package errors while running GCP Dataflow. I verified personally as well.

Upvotes: 2

Related Questions