Bhagesh Arora
Bhagesh Arora

Reputation: 547

Broken DAG: [/home/airflow/gcs/dags/airflow_test_task.py] name 'cfg' is not defined

I am new in python and airflow, I am using GCP composer environment for creating a DAG.
In this python code I created two task one is for reading a zip or csv file another one for creating a dataproc cluster. In one task I am calling one method readYML which is reading the yml configuration file for dataproc cluster argument like cluster-name, project_id etc, and same argument I am using further in the second task, see the below code for better understanding

# Importing Modules

from airflow import DAG

from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile

from airflow.models import Variable
import yaml
from google.cloud import storage
from airflow.contrib.operators import dataproc_operator
import pandas as pd


global cfg

def readYML():
    print("inside readzip")
    file_name = "/home/airflow/gcs/data/cluster_config.yml"
    with open(file_name, 'r') as ymlfile:
        cfg = yaml.load(ymlfile)
    print("inside readYML method : ", cfg['configs']['project_id'])


def iterate_bucket():
    global blobs
    bucket_name = 'europe-west1-airflow-test-9bbb5fc7-bucket'
    storage_client = storage.Client.from_service_account_json(
        '/home/airflow/gcs/data/service_account_key_gcp_compute_bmg.json')
    bucket = storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()


def print_PcsvData():
    iterate_bucket()
    readYML()
    global readPcsv

    for blob in blobs:
        if "physical.zip" in blob.name:
            print("hello : ", blob.name)
            file_name = "/home/airflow/gcs/" + blob.name

    with ZipFile(file_name, 'r') as zip:
        # printing all the contents of the zip file
        for info in zip.infolist():
            readfilename = info.filename
            print(readfilename)

    readPcsv = pd.read_csv("/home/airflow/gcs/data/" + readfilename)

    print("physi cal.csv : ", readPcsv)
    print('Done!')


dag_name = Variable.get("dag_name")

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'cluster_name': cfg['configs']['cluster_name'],   
 }

  # Instantiate a DAG

   dag = DAG(dag_id='read_yml', default_args=default_args, 
   schedule_interval=timedelta(days=1))

 # Creating Tasks   

t1 = PythonOperator(task_id='Raw1', python_callable=print_PcsvData, 
dag=dag)

create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
    task_id='create_dataproc_cluster',
    project_id=cfg['configs']['project_id'],
    cluster_name=cfg['configs']['cluster_name'],
    num_workers=cfg['configs']['num_workers'],
    zone=cfg['configs']['zone'],
    master_machine_type=cfg['configs']['master_machine_type'],
    worker_machine_type=cfg['configs']['worker_machine_type'],
    dag=dag)

t1 >> create_dataproc_cluster

In this code, I want to use cfg variable globally, In default args also I want to access this variable but I am getting error, I don't know its scope related issue or what even I declared cfg variable inside readYML method also but still error persists. Any help would be appreciated. Thanks in advance

Upvotes: 0

Views: 5706

Answers (1)

kaxil
kaxil

Reputation: 18844

Check the DAG file below that you should use:

Few changes that you should make:

Updated file:

# Importing Modules

from airflow import DAG

from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile

from airflow.models import Variable
import yaml
from google.cloud import storage
from airflow.contrib.operators import dataproc_operator
import pandas as pd



def readYML():
    print("inside readzip")
    file_name = "/home/airflow/gcs/data/cluster_config.yml"
    with open(file_name, 'r') as ymlfile:
        cfg = yaml.load(ymlfile)
    print("inside readYML method : ", cfg['configs']['project_id'])
    return cfg


def iterate_bucket():
    bucket_name = 'europe-west1-airflow-test-9bbb5fc7-bucket'
    storage_client = storage.Client.from_service_account_json(
        '/home/airflow/gcs/data/service_account_key_gcp_compute_bmg.json')
    bucket = storage_client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()
    return blobs


def print_PcsvData():
    blobs = iterate_bucket()

    for blob in blobs:
        if "physical.zip" in blob.name:
            print("hello : ", blob.name)
            file_name = "/home/airflow/gcs/" + blob.name

    with ZipFile(file_name, 'r') as zip:
        # printing all the contents of the zip file
        for info in zip.infolist():
            readfilename = info.filename
            print(readfilename)

    readPcsv = pd.read_csv("/home/airflow/gcs/data/" + readfilename)

    print("physi cal.csv : ", readPcsv)
    print('Done!')
    return readPcsv

dag_name = Variable.get("dag_name")

cfg = readYML()

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'cluster_name': cfg['configs']['cluster_name'],   
 }

# Instantiate a DAG

dag = DAG(dag_id='read_yml', default_args=default_args, 
schedule_interval=timedelta(days=1))

# Creating Tasks   

t1 = PythonOperator(task_id='Raw1', python_callable=print_PcsvData, 
dag=dag)

create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
    task_id='create_dataproc_cluster',
    project_id=cfg['configs']['project_id'],
    cluster_name=cfg['configs']['cluster_name'],
    num_workers=cfg['configs']['num_workers'],
    zone=cfg['configs']['zone'],
    master_machine_type=cfg['configs']['master_machine_type'],
    worker_machine_type=cfg['configs']['worker_machine_type'],
    dag=dag)

t1 >> create_dataproc_cluster

Upvotes: 2

Related Questions