Reputation: 1687
I'm trying to run a pipeline with apache_beam (at the end will get to DataFlow).
The pipeline should look like the following:
I format the data from PubSub, I write raw results to Firestore, I run the ML model, and after I have the results from the ML model I want to update firestore with the ID I got from the first write to FS.
The pipeline code in general looks like this:
with beam.Pipeline(options=options) as p:
# read and format
formated_msgs = (
p
| "Read from PubSub" >> LoadPubSubData(known_args.topic)
)
# write the raw results to firestore
write_results = (
formated_msgs
| "Write to FS" >> beam.ParDo(WriteToFS())
| "Key FS" >> beam.Map(lambda fs: (fs["record_uuid"], fs))
)
# Run the ML model
ml_results = (
formated_msgs
| "ML" >> ML()
| "Key ML" >> beam.Map(lambda row: (row["record_uuid"], row))
)
# Merge by key and update - HERE IS THE PROBLEM
(
(write_results, ml_results) # I want to have the data from both merged by the key at this point
| "group" >> beam.CoGroupByKey()
| "log" >> beam.ParDo(LogFn())
)
I have tried so many ways, but I can't seem to find the correct way to do so. Any ideas?
--- update 1 ---
The problem is that on the log line I don't get anything. Sometimes, I even get a timeout on the operation. It might be important to note that I'm streaming the data from PubSub at the beginning.
Upvotes: 1
Views: 441
Reputation: 1687
OK, so I finally figured it out. The only thing I was missing is Windowing, I assume since I'm streaming the data.
So I've added the following:
with beam.Pipeline(options=options) as p:
# read and format
formated_msgs = (
p
| "Read from PubSub" >> LoadPubSubData(known_args.topic)
| "Windowing" >> beam.WindowInto(window.FixedWindows(30))
)
Upvotes: 1