WIT
WIT

Reputation: 1083

If statement for steps in a apache beam dataflow pipeline (python)

I was wondering if it is possible to have an if statement in a beam pipeline for enacting a different transform based on different scenarios. For example:

1) Make one of the input arguments backfill/regular and then based on that input argument it would decide whether to start with

(p 
            | fileio.MatchFiles(known_args.input_bucket)
            | fileio.ReadMatches()
            | beam.Map(lambda file: file.metadata.path, json.loads(file.read_utf8())))

or

p | beam.io.ReadFromText(known_args.input_file_name)

2) If the file name contains a certain country name (i.e. USA), call TransformUSA(beam.DoFn), else call TransformAllCountries(beam.DoFn)

Sorry if this isn't a great question, i haven't seen this anywhere else and am trying to make my code modular instead of having separate pipelines

Upvotes: 2

Views: 2983

Answers (1)

Pablo
Pablo

Reputation: 11031

It is completely possible to have an if statement for your pipeline, but remember that things should be known at pipeline construction time. So, for instance:

with beam.Pipeline(...) as p:
  if known_args.backfill == True:
    input_pcoll = (p
                   | fileio.MatchFiles(known_args.input_bucket)
                   | fileio.ReadMatches()
                   | beam.Map(lambda file: file.read_utf8().split('\n'))
  else:
    input_pcoll = (p
                   | beam.io.ReadFromText(known_args.input_file_name)

And then, for your TransformUSA, you would do something like:

if 'USA' in known_args.input_file_name:
  next_pcoll = input_pcoll | beam.ParDo(TransformUSA())
else:
  next_pcoll = input_pcoll | beam.ParDo(TransformAllCountries())

Does that make sense?

Upvotes: 3

Related Questions