Reputation: 31
I try to run a Apache Beam pipeline (Python) within Google Cloud Dataflow, triggered by a DAG in Google Cloud Coomposer.
The structure of my dags folder in the respective GCS bucket is as follows:
/dags/
dataflow.py <- DAG
dataflow/
pipeline.py <- pipeline
setup.py
my_modules/
__init__.py
commons.py <- the module I want to import in the pipeline
The setup.py is very basic, but according to the Apache Beam docs and answers on SO:
import setuptools
setuptools.setup(setuptools.find_packages())
In the DAG file (dataflow.py) I set the setup_file
option and pass it to Dataflow:
default_dag_args = {
... ,
'dataflow_default_options': {
... ,
'runner': 'DataflowRunner',
'setup_file': os.path.join(configuration.get('core', 'dags_folder'), 'dataflow', 'setup.py')
}
}
Within the pipeline file (pipeline.py) I try to use
from my_modules import commons
but this fails. The log in Google Cloud Composer (Apache Airflow) says:
gcp_dataflow_hook.py:132} WARNING - b' File "/home/airflow/gcs/dags/dataflow/dataflow.py", line 11\n from my_modules import commons\n ^\nSyntaxError: invalid syntax'
The basic idea behind the setup.py file is documented here
Also, there are similar questions on SO which helped me:
Google Dataflow - Failed to import custom python modules
Dataflow/apache beam: manage custom module dependencies
I'm actually wondering why my pipelines fails with a Syntax Error
and not a module not found
kind of error...
Upvotes: 3
Views: 2941
Reputation: 416
I tried to reproduce your issue and then try to solve it, so I created the same folder structure you already have:
/dags/
dataflow.py
dataflow/
pipeline.py -> pipeline
setup.py
my_modules/
__init__.py
common.py
Therefore, to make it work, the change I made is to copy these folders to a place where the instance is running the code is able to find it, for example in the /tmp/
folder of the instance.
So, my DAG would be something like this:
1 - Fist of all I declare my arguments:
default_args = {
'start_date': datetime(xxxx, x, x),
'retries': 1,
'retry_delay': timedelta(minutes=5),
'dataflow_default_options': {
'project': '<project>',
'region': '<region>',
'stagingLocation': 'gs://<bucket>/stage',
'tempLocation': 'gs://<bucket>/temp',
'setup_file': <setup.py>,
'runner': 'DataflowRunner'
}
}
2- After this, I created the DAG and before running the Dataflow task, I copied the whole folder directory, above created, into the /tmp/
folder of the instance Task t1
, and after this, I run the pipeline from the /tmp/ directory Task t2
:
with DAG(
'composer_df',
default_args=default_args,
description='datflow dag',
schedule_interval="xxxx") as dag:
def copy_dependencies():
process = subprocess.Popen(['gsutil','cp', '-r' ,'gs://<bucket>/dags/*',
'/tmp/'])
process.communicate()
t1 = python_operator.PythonOperator(
task_id='copy_dependencies',
python_callable=copy_dependencies,
provide_context=False
)
t2 = DataFlowPythonOperator(task_id="composer_dataflow",
py_file='/tmp/dataflow/pipeline.py', job_name='job_composer')
t1 >> t2
That's how I created the DAG file dataflow.py
, and then, in the pipeline.py
the package to import would be like:
from my_modules import commons
It should work fine, since the folder directory is understandable for the VM.
Upvotes: 3