Claaass
Claaass

Reputation: 31

Google Dataflow: Import custom Python module

I try to run a Apache Beam pipeline (Python) within Google Cloud Dataflow, triggered by a DAG in Google Cloud Coomposer.

The structure of my dags folder in the respective GCS bucket is as follows:

/dags/
  dataflow.py <- DAG
  dataflow/
    pipeline.py <- pipeline
    setup.py
    my_modules/
      __init__.py
      commons.py <- the module I want to import in the pipeline

The setup.py is very basic, but according to the Apache Beam docs and answers on SO:

import setuptools

setuptools.setup(setuptools.find_packages())

In the DAG file (dataflow.py) I set the setup_file option and pass it to Dataflow:

default_dag_args = {
    ... ,
    'dataflow_default_options': {
        ... ,
        'runner': 'DataflowRunner',
        'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
    }
}

Within the pipeline file (pipeline.py) I try to use

from my_modules import commons

but this fails. The log in Google Cloud Composer (Apache Airflow) says:

gcp_dataflow_hook.py:132} WARNING - b'  File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n    from my_modules import commons\n           ^\nSyntaxError: invalid syntax'

The basic idea behind the setup.py file is documented here

Also, there are similar questions on SO which helped me:

Google Dataflow - Failed to import custom python modules

Dataflow/apache beam: manage custom module dependencies

I'm actually wondering why my pipelines fails with a Syntax Error and not a module not found kind of error...

Upvotes: 3

Views: 2941

Answers (1)

I tried to reproduce your issue and then try to solve it, so I created the same folder structure you already have:

/dags/
  dataflow.py
  dataflow/
     pipeline.py -> pipeline
     setup.py
     my_modules/
        __init__.py
        common.py

Therefore, to make it work, the change I made is to copy these folders to a place where the instance is running the code is able to find it, for example in the /tmp/ folder of the instance.

So, my DAG would be something like this:

1 - Fist of all I declare my arguments:

default_args = {
   'start_date': datetime(xxxx, x, x),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),
   'dataflow_default_options': {
       'project': '<project>',
       'region': '<region>',
       'stagingLocation': 'gs://<bucket>/stage',
       'tempLocation': 'gs://<bucket>/temp',
       'setup_file': <setup.py>,
       'runner': 'DataflowRunner'
   }
} 

2- After this, I created the DAG and before running the Dataflow task, I copied the whole folder directory, above created, into the /tmp/ folder of the instance Task t1, and after this, I run the pipeline from the /tmp/ directory Task t2:

with DAG(
    'composer_df',
     default_args=default_args,
     description='datflow dag',
     schedule_interval="xxxx") as dag:

     def copy_dependencies():
          process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*', 
          '/tmp/'])
          process.communicate()


     t1 = python_operator.PythonOperator(
        task_id='copy_dependencies',
        python_callable=copy_dependencies,
        provide_context=False
     )


     t2 = DataFlowPythonOperator(task_id="composer_dataflow", 
          py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')

     t1 >> t2

That's how I created the DAG file dataflow.py, and then, in the pipeline.py the package to import would be like:

from my_modules import commons

It should work fine, since the folder directory is understandable for the VM.

Upvotes: 3

Related Questions