totooooo
totooooo

Reputation: 1415

Apache Beam in python: How to reuse exactly the same transform on another PCollection

Several of my PCollections (that come from different sources) have to be decoded in the same way.

hits = (msgs | 'Parse' >> beam.Map(parse)
    | 'Decode' >> beam.Map(decode_hit))

Then:

dummy_hits = (dummy_msgs | 'Parse' >> beam.Map(parse)
    | 'Decode' >> beam.Map(decode_hit))

It would be really nice if I could reuse the transforms thanks to the names I've given them earlier. I naively tried this:

dummy_hits = (dummy_msgs | 'Parse'
    | 'Decode')

But my pipeline won't build. (TypeError: Expected a PTransform object, got Parse).

I thought it would be possible as documentation for the pipeline module states: "If same transform instance needs to be applied then the right shift operator should be used to designate new names (e.g. input | "label" >> my_tranform)"

What's the way for doing this? Is this only possible?

Upvotes: 4

Views: 2538

Answers (1)

Alex
Alex

Reputation: 5286

Names have to be unique, but since your sequence of steps is the same maybe you want to create a composite transform like this

https://beam.apache.org/get-started/wordcount-example/#creating-composite-transforms

So do this:

class ParseDecode(beam.PTransform):

  def expand(self, pcoll):
    return (pcoll
            | 'Parse' >> beam.Map(parse)
            | 'Decode' >> beam.Map(decode_hit))

So that you can do this:

hits = (msgs | 'Parse msgs' >> ParseDecode()

and then this:

dummy_hits = (dummy_msgs | 'Parse dummy msgs' >> ParseDecode()

Upvotes: 6

Related Questions