Reputation: 1415
Several of my PCollections (that come from different sources) have to be decoded in the same way.
hits = (msgs | 'Parse' >> beam.Map(parse)
| 'Decode' >> beam.Map(decode_hit))
Then:
dummy_hits = (dummy_msgs | 'Parse' >> beam.Map(parse)
| 'Decode' >> beam.Map(decode_hit))
It would be really nice if I could reuse the transforms thanks to the names I've given them earlier. I naively tried this:
dummy_hits = (dummy_msgs | 'Parse'
| 'Decode')
But my pipeline won't build. (TypeError: Expected a PTransform object, got Parse).
I thought it would be possible as documentation for the pipeline module states: "If same transform instance needs to be applied then the right shift operator
should be used to designate new names (e.g. input | "label" >> my_tranform
)"
What's the way for doing this? Is this only possible?
Upvotes: 4
Views: 2538
Reputation: 5286
Names have to be unique, but since your sequence of steps is the same maybe you want to create a composite transform like this
https://beam.apache.org/get-started/wordcount-example/#creating-composite-transforms
So do this:
class ParseDecode(beam.PTransform):
def expand(self, pcoll):
return (pcoll
| 'Parse' >> beam.Map(parse)
| 'Decode' >> beam.Map(decode_hit))
So that you can do this:
hits = (msgs | 'Parse msgs' >> ParseDecode()
and then this:
dummy_hits = (dummy_msgs | 'Parse dummy msgs' >> ParseDecode()
Upvotes: 6