Ravi Ranjan
Ravi Ranjan

Reputation: 62

Can we use for loop to create apache beam data flow pipeline dynamically?

Can we use for loop to create apache beam data flow pipeline dynamically? My fear is how for loop will behave in distributed environment when i am using it with data flow runner. I am sure this will work fine with direct runner

for example can I create pipelines dynamically like this:

with beam.Pipeline(options=pipeline_options) as pipeline:
        for p in cdata['tablelist']:
            i_file_path = p['sourcefile']
            schemauri = p['schemauri']
            schema=getschema(schemauri)
            dest_table_id = p['targettable']

            (   pipeline | "Read From Input Datafile" + dest_table_id >> beam.io.ReadFromText(i_file_path)
                         | "Convert to Dict" + dest_table_id >> beam.Map(lambda r: data_ingestion.parse_method(r))
                         | "Write to BigQuery Table" + dest_table_id >> beam.io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
                                                                            schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

Upvotes: 0

Views: 916

Answers (2)

robertwb
robertwb

Reputation: 5104

Yes, this is totally legal, and lots of pipelines (especially ML ones) are constructed this way. Your looped pipeline construction above should work just fine on all runners.

You can think of a Beam pipeline has having two phases: construction and execution. The first phase, construction, happens entirely in the main program and can have arbitrary loops, control statements, etc. Behind the scenes, this builds up a DAG of deferred operations (such as reads, maps, etc.) to perform. If you have a loop, each iteration will simply append more operations to this graph. The only think you can't do in this phase is inspect the data (i.e. the contents of a PCollection) itself.

The second phase, execution, starts when pipeline.run() is invoked. (For Python, this is implicitly invoked on exiting the with block). At this point the pipeline graph (as constructed above), its dependencies, pipeline options, etc. are passed to a Runner which will actually execute the fully-specified graph, ideally in parallel.

This is covered a bit in the programming guide, though I agree it could be more clear.

Upvotes: 3

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

I think it's not possible.

You have many other solutions to do that.

If you have an orchestrator like Cloud Composer/Airflow or Cloud Workflows, you can put this logic inside this orchestrator, instantiate and launch a Dataflow job per element in the loop :

Solution 1, example with Airflow :

for p in cdata['tablelist']:
      i_file_path = p['sourcefile']
      schemauri = p['schemauri']
      dest_table_id = p['targettable']

      options = {
          'i_file_path': i_file_path,
          'dest_table_id': dest_table_id,
          'schemauri' : schemauri,
          ...
      }

      dataflow_task = DataflowCreatePythonJobOperator(
          py_file=beam_main_file_path,
          task_id=f'task_{dest_table_id}',
          dataflow_default_options=your_default_options,
          options=options,
          gcp_conn_id="google_cloud_default"
      )
      
      # You can execute your Dataflow jobs in parallel
      dataflow_task >> DummyOperator(task_id='END', dag=dag)

Solution 2, with a shell script :

for module_path in ${tablelist}; do
   # Options
   i_file_path = ...
   schemauri = ...
   dest_table_id = ...

   #Python command to execute the Dataflow job
   python -m your_module.main \
        --runner=DataflowRunner \
        --staging_location=gs://your_staging_location/ \
        --temp_location=gs://your_temp_location/ \
        --region=europe-west1 \
        --setup_file=./setup.py \
        --i_file_path=$i_file_path \
        --schemauri=$schemauri \
        --dest_table_id=$dest_table_id

In this case Dataflow jobs are executed in sequential.

If you have too many files and Dataflow jobs to launch, you can think about another solution. With a shell script or a cloud function you can get all the needed files and rename them as expected (with metadata on filename), move them in a separated object in GCS.

Then in a single Dataflow job :

  • read all the previous files via a pattern
  • Parse the metadata from filename like schemauri and dest_table_id
  • apply the map operation in the job on the current element
  • write the result to Bigquery

If you don't have a huge amount of files, the two first solutions are simplers.

Upvotes: 1

Related Questions