Elon Salfati
Elon Salfati

Reputation: 1687

Merge PCollection with apache_beam

I'm trying to run a pipeline with apache_beam (at the end will get to DataFlow).

The pipeline should look like the following:

enter image description here

I format the data from PubSub, I write raw results to Firestore, I run the ML model, and after I have the results from the ML model I want to update firestore with the ID I got from the first write to FS.

The pipeline code in general looks like this:

with beam.Pipeline(options=options) as p:
    # read and format
    formated_msgs = (
        p
        | "Read from PubSub" >> LoadPubSubData(known_args.topic)
    )

    # write the raw results to firestore
    write_results = (
        formated_msgs
        | "Write to FS" >> beam.ParDo(WriteToFS())
        | "Key FS" >> beam.Map(lambda fs: (fs["record_uuid"], fs))
    )

    # Run the ML model
    ml_results = (
        formated_msgs
        | "ML" >> ML()
        | "Key ML" >> beam.Map(lambda row: (row["record_uuid"], row))
    )

    # Merge by key and update - HERE IS THE PROBLEM
    (
        (write_results, ml_results) # I want to have the data from both merged by the key at this point
        | "group" >> beam.CoGroupByKey()
        | "log" >> beam.ParDo(LogFn())
    )

I have tried so many ways, but I can't seem to find the correct way to do so. Any ideas?

--- update 1 ---

The problem is that on the log line I don't get anything. Sometimes, I even get a timeout on the operation. It might be important to note that I'm streaming the data from PubSub at the beginning.

Upvotes: 1

Views: 441

Answers (1)

Elon Salfati
Elon Salfati

Reputation: 1687

OK, so I finally figured it out. The only thing I was missing is Windowing, I assume since I'm streaming the data.

So I've added the following:

with beam.Pipeline(options=options) as p:
    # read and format
    formated_msgs = (
        p
        | "Read from PubSub" >> LoadPubSubData(known_args.topic)
        | "Windowing" >> beam.WindowInto(window.FixedWindows(30))
    )

Upvotes: 1

Related Questions