BSpinoza
BSpinoza

Reputation: 93

ModuleNotFoundError: No module named 'airflow'

I'm using the Airflow PythonOperator to execute a python Beam job using the Dataflow runner. The Dataflow job returns the error "ModuleNotFoundError: No module named 'airflow'"

In the DataFlow UI the SDK version being used when the job is called using the PythonOperator is 2.15.0. If the job is executed from Cloud shell the SDK version being used is 2.23.0. The job works when initiated from the shell.

The Environment details for Composer are:

Image version = composer-1.10.3-airflow-1.10.3

Python version= 3

A previous post suggested using the PythonVirtualenvOperator operator. I tried this using the settings:

requirements=['apache-beam[gcp]'],

python_version=3

Composer returns the error "'install', 'apache-beam[gcp]']' returned non-zero exit status 2."

Any advice would be greatly appreciated.

This is the DAG that calls the Dataflow job. I have not shown all the functions that are used in the DAG but kept the imports in :

  import logging
    import pprint
    import json
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
    from airflow.models import DAG
    import google.cloud.logging
    from datetime import timedelta
    from airflow.utils.dates import days_ago
    from deps import utils
    from google.cloud import storage
    from airflow.exceptions import AirflowException
    from deps import logger_montr
    from deps import dataflow_clean_csv
    
    
    
    dag = DAG(dag_id='clean_data_file',
              default_args=args,
              description='Runs Dataflow to clean csv files',
              schedule_interval=None)
    
    def get_values_from_previous_dag(**context):
        var_dict = {}
        for key, val in context['dag_run'].conf.items():
            context['ti'].xcom_push(key, val)
            var_dict[key] = val
    
    populate_ti_xcom = PythonOperator(
        task_id='get_values_from_previous_dag',
        python_callable=get_values_from_previous_dag,
        provide_context=True,
        dag=dag,
    )
    
    
    dataflow_clean_csv = PythonOperator(
        task_id = "dataflow_clean_csv",
        python_callable = dataflow_clean_csv.clean_csv_dataflow,
        op_kwargs= {
         'project': 
         'zone': 
         'region': 
         'stagingLocation':
         'inputDirectory': 
         'filename': 
         'outputDirectory':     
        },
        provide_context=True,
        dag=dag,
    )

populate_ti_xcom >> dataflow_clean_csv

I use the ti.xcom_pull(task_ids = 'get_values_from_previous_dag') method to assign the op_kwargs.

This is the Dataflow job that is being called:

import apache_beam as beam
import csv
import logging
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import WriteToText


def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter=',', quoting=csv.QUOTE_ALL):
      line = [s.replace('\"', '') for s in line]
      clean_line = '","'.join(line)
      final_line = '"'+ clean_line +'"'
      return final_line

def clean_csv_dataflow(**kwargs): 
    argv = [
           # Dataflow pipeline options 
           "--region={}".format(kwargs["region"]),
           "--project={}".format(kwargs["project"]) ,
           "--temp_location={}".format(kwargs["stagingLocation"]),
           # Setting Dataflow pipeline options  
           '--save_main_session',
           '--max_num_workers=8',
           '--autoscaling_algorithm=THROUGHPUT_BASED', 
           # Mandatory constants
           '--job_name=cleancsvdataflow',
           '--runner=DataflowRunner'     
          ]
    options = PipelineOptions(
      flags=argv
      )
      
    pipeline = beam.Pipeline(options=options)
    
    inputDirectory = kwargs["inputDirectory"]
    filename = kwargs["filename"]
    outputDirectory = kwargs["outputDirectory"]

    
    outputfile_temp = filename
    outputfile_temp = outputfile_temp.split(".")
    outputfile = "_CLEANED.".join(outputfile_temp)   

    in_path_and_filename = "{}{}".format(inputDirectory,filename)
    out_path_and_filename = "{}{}".format(outputDirectory,outputfile)
    
    pipeline = beam.Pipeline(options=options)
   

    clean_csv = (pipeline 
      | "Read input file" >> beam.io.ReadFromText(in_path_and_filename)
      | "Parse file" >> beam.Map(parse_file)
      | "writecsv" >> beam.io.WriteToText(out_path_and_filename,num_shards=1)
    )
   
    pipeline.run()

Upvotes: 3

Views: 12070

Answers (1)

aga
aga

Reputation: 3903

This answer was provided by @BSpinoza in the comment section:

What I did was move all imports from the global namespace and place them into the function definitions. Then, from the calling DAG I used the BashOperator. It worked.

Also, one of the recommended way is to use DataFlowPythonOperator.

Upvotes: 1

Related Questions