Reputation: 3338
I am trying to write a small DoFn to write data from my dataflow pipeline into Cloud Firestore. Locally everything works as expected, but when trying to run on dataflow everything falls apart!
Here is my function:
class FirestoreWriteDoFn(beam.DoFn):
def __init__(self):
super(FirestoreWriteDoFn, self).__init__()
def start_bundle(self):
import google.cloud
self.db = google.cloud.firestore.Client(project='ag-audience')
def process(self, element):
fb_data = {
'topics': element.get('page_keywords').split(','),
'title': element.get('page_title')
}
logging.info('Inserting into Firebase: %s', fb_data)
fb_doc = self.db.document('totallyNotBigtable', element.get('key'))
result = fb_doc.create(fb_data)
yield result
Here is the command with which it is deployed:
$ python pipe/main.py \
--runner=dataflow \
--project=ag-audience \
--region=europe-west1 \
--machine_type=n1-standard-4 \
--temp_location=gs://ag-dataflow/tmp \
--requirements_file requirements.txt \
--save_main_session \
--streaming
And here is my requirements.txt:
google-cloud-firestore>=1.3.0
I have tried many things:
- Importing the firestore module globally at the top of the file.
- Importing it in different ways: import x from y
, import y
.
- Importing it in various parts of the code.
The errors are always that something is undefined:
NameError: global name 'google' is not defined [while running 'generatedPtransform-480']
EDIT: (adding the pipeline code)
with beam.Pipeline(argv=pipeline_args) as p:
crawled_features = (p
| 'ReadPubsubCrawls' >> ReadFromPubSub(topic=PUBSUB_TOPIC_CRAWLED_FEATURES).with_output_types(bytes)
| 'DebugLogInput' >> beam.ParDo(LogResults())
| 'JSONParse2' >> beam.Map(lambda x: json.loads(x))
)
firebase_stream = (crawled_features
| 'WriteFirebase' >> beam.ParDo(FirestoreWriteDoFn())
| 'LogFirebaseWriteResult' >> beam.ParDo(LogResults())
)
bigquery_stream = (crawled_features
| 'ShapeRow' >> beam.Map(ShapeBQRow)
| 'LogShapedBQRow' >> beam.ParDo(LogResults())
| 'WriteBigQuery' >> beam.io.WriteToBigQuery(
table=BIGQUERY_TABLE,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
Upvotes: 2
Views: 1403
Reputation: 346
The issue is with Beam version. In 2.13.0 there is probably some bug, but with 2.12.0 it works ok based on Python package errors while running GCP Dataflow. I verified personally as well.
Upvotes: 2