Reputation: 93
I'm using the Airflow PythonOperator to execute a python Beam job using the Dataflow runner.
The Dataflow job returns the error "ModuleNotFoundError: No module named 'airflow'"
In the DataFlow UI the SDK version being used when the job is called using the PythonOperator is 2.15.0. If the job is executed from Cloud shell the SDK version being used is 2.23.0. The job works when initiated from the shell.
The Environment details for Composer are:
Image version = composer-1.10.3-airflow-1.10.3
Python version= 3
A previous post suggested using the PythonVirtualenvOperator operator. I tried this using the settings:
requirements=['apache-beam[gcp]'],
python_version=3
Composer returns the error "'install', 'apache-beam[gcp]']' returned non-zero exit status 2."
Any advice would be greatly appreciated.
This is the DAG that calls the Dataflow job. I have not shown all the functions that are used in the DAG but kept the imports in :
import logging
import pprint
import json
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.models import DAG
import google.cloud.logging
from datetime import timedelta
from airflow.utils.dates import days_ago
from deps import utils
from google.cloud import storage
from airflow.exceptions import AirflowException
from deps import logger_montr
from deps import dataflow_clean_csv
dag = DAG(dag_id='clean_data_file',
default_args=args,
description='Runs Dataflow to clean csv files',
schedule_interval=None)
def get_values_from_previous_dag(**context):
var_dict = {}
for key, val in context['dag_run'].conf.items():
context['ti'].xcom_push(key, val)
var_dict[key] = val
populate_ti_xcom = PythonOperator(
task_id='get_values_from_previous_dag',
python_callable=get_values_from_previous_dag,
provide_context=True,
dag=dag,
)
dataflow_clean_csv = PythonOperator(
task_id = "dataflow_clean_csv",
python_callable = dataflow_clean_csv.clean_csv_dataflow,
op_kwargs= {
'project':
'zone':
'region':
'stagingLocation':
'inputDirectory':
'filename':
'outputDirectory':
},
provide_context=True,
dag=dag,
)
populate_ti_xcom >> dataflow_clean_csv
I use the ti.xcom_pull(task_ids = 'get_values_from_previous_dag') method to assign the op_kwargs.
This is the Dataflow job that is being called:
import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText
def parse_file(element):
for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL):
line = [s.replace('\"', '') for s in line]
clean_line = '","'.join(line)
final_line = '"'+ clean_line +'"'
return final_line
def clean_csv_dataflow(**kwargs):
argv = [
# Dataflow pipeline options
"--region={}".format(kwargs["region"]),
"--project={}".format(kwargs["project"]) ,
"--temp_location={}".format(kwargs["stagingLocation"]),
# Setting Dataflow pipeline options
'--save_main_session',
'--max_num_workers=8',
'--autoscaling_algorithm=THROUGHPUT_BASED',
# Mandatory constants
'--job_name=cleancsvdataflow',
'--runner=DataflowRunner'
]
options = PipelineOptions(
flags=argv
)
pipeline = beam.Pipeline(options=options)
inputDirectory = kwargs["inputDirectory"]
filename = kwargs["filename"]
outputDirectory = kwargs["outputDirectory"]
outputfile_temp = filename
outputfile_temp = outputfile_temp.split(".")
outputfile = "_CLEANED.".join(outputfile_temp)
in_path_and_filename = "{}{}".format(inputDirectory,filename)
out_path_and_filename = "{}{}".format(outputDirectory,outputfile)
pipeline = beam.Pipeline(options=options)
clean_csv = (pipeline
| "Read input file" >> beam.io.ReadFromText(in_path_and_filename)
| "Parse file" >> beam.Map(parse_file)
| "writecsv" >> beam.io.WriteToText(out_path_and_filename,num_shards=1)
)
pipeline.run()
Upvotes: 3
Views: 12070
Reputation: 3903
This answer was provided by @BSpinoza in the comment section:
What I did was move all
imports
from the global namespace and place them into the function definitions. Then, from the calling DAG I used theBashOperator
. It worked.
Also, one of the recommended way is to use DataFlowPythonOperator.
Upvotes: 1