Reputation: 1083
I was wondering if it is possible to have an if statement in a beam pipeline for enacting a different transform based on different scenarios. For example:
1) Make one of the input arguments backfill/regular and then based on that input argument it would decide whether to start with
(p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))
or
p | beam.io.ReadFromText(known_args.input_file_name)
2) If the file name contains a certain country name (i.e. USA), call TransformUSA(beam.DoFn)
, else call TransformAllCountries(beam.DoFn)
Sorry if this isn't a great question, i haven't seen this anywhere else and am trying to make my code modular instead of having separate pipelines
Upvotes: 2
Views: 2983
Reputation: 11031
It is completely possible to have an if statement for your pipeline, but remember that things should be known at pipeline construction time. So, for instance:
with beam.Pipeline(...) as p:
if known_args.backfill == True:
input_pcoll = (p
| fileio.MatchFiles(known_args.input_bucket)
| fileio.ReadMatches()
| beam.Map(lambda file: file.read_utf8().split('\n'))
else:
input_pcoll = (p
| beam.io.ReadFromText(known_args.input_file_name)
And then, for your TransformUSA
, you would do something like:
if 'USA' in known_args.input_file_name:
next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())
Does that make sense?
Upvotes: 3