Reputation: 547
I am new in python and airflow, I am using GCP composer environment for creating a DAG.
In this python code I created two task one is for reading a zip or csv file another one for creating a dataproc cluster. In one task I am calling one method readYML which is reading the yml configuration file for dataproc cluster argument like cluster-name, project_id etc, and same argument I am using further in the second task, see the below code for better understanding
# Importing Modules
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.models import Variable
import yaml
from google.cloud import storage
from airflow.contrib.operators import dataproc_operator
import pandas as pd
global cfg
def readYML():
print("inside readzip")
file_name = "/home/airflow/gcs/data/cluster_config.yml"
with open(file_name, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
print("inside readYML method : ", cfg['configs']['project_id'])
def iterate_bucket():
global blobs
bucket_name = 'europe-west1-airflow-test-9bbb5fc7-bucket'
storage_client = storage.Client.from_service_account_json(
'/home/airflow/gcs/data/service_account_key_gcp_compute_bmg.json')
bucket = storage_client.get_bucket(bucket_name)
blobs = bucket.list_blobs()
def print_PcsvData():
iterate_bucket()
readYML()
global readPcsv
for blob in blobs:
if "physical.zip" in blob.name:
print("hello : ", blob.name)
file_name = "/home/airflow/gcs/" + blob.name
with ZipFile(file_name, 'r') as zip:
# printing all the contents of the zip file
for info in zip.infolist():
readfilename = info.filename
print(readfilename)
readPcsv = pd.read_csv("/home/airflow/gcs/data/" + readfilename)
print("physi cal.csv : ", readPcsv)
print('Done!')
dag_name = Variable.get("dag_name")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'cluster_name': cfg['configs']['cluster_name'],
}
# Instantiate a DAG
dag = DAG(dag_id='read_yml', default_args=default_args,
schedule_interval=timedelta(days=1))
# Creating Tasks
t1 = PythonOperator(task_id='Raw1', python_callable=print_PcsvData,
dag=dag)
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
project_id=cfg['configs']['project_id'],
cluster_name=cfg['configs']['cluster_name'],
num_workers=cfg['configs']['num_workers'],
zone=cfg['configs']['zone'],
master_machine_type=cfg['configs']['master_machine_type'],
worker_machine_type=cfg['configs']['worker_machine_type'],
dag=dag)
t1 >> create_dataproc_cluster
In this code, I want to use cfg variable globally, In default args also I want to access this variable but I am getting error, I don't know its scope related issue or what even I declared cfg variable inside readYML method also but still error persists. Any help would be appreciated. Thanks in advance
Upvotes: 0
Views: 5706
Reputation: 18844
Check the DAG file below that you should use:
Few changes that you should make:
datetime.now()
- https://airflow.apache.org/faq.html#what-s-the-deal-with-start-dateUpdated file:
# Importing Modules
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from zipfile import ZipFile
from airflow.models import Variable
import yaml
from google.cloud import storage
from airflow.contrib.operators import dataproc_operator
import pandas as pd
def readYML():
print("inside readzip")
file_name = "/home/airflow/gcs/data/cluster_config.yml"
with open(file_name, 'r') as ymlfile:
cfg = yaml.load(ymlfile)
print("inside readYML method : ", cfg['configs']['project_id'])
return cfg
def iterate_bucket():
bucket_name = 'europe-west1-airflow-test-9bbb5fc7-bucket'
storage_client = storage.Client.from_service_account_json(
'/home/airflow/gcs/data/service_account_key_gcp_compute_bmg.json')
bucket = storage_client.get_bucket(bucket_name)
blobs = bucket.list_blobs()
return blobs
def print_PcsvData():
blobs = iterate_bucket()
for blob in blobs:
if "physical.zip" in blob.name:
print("hello : ", blob.name)
file_name = "/home/airflow/gcs/" + blob.name
with ZipFile(file_name, 'r') as zip:
# printing all the contents of the zip file
for info in zip.infolist():
readfilename = info.filename
print(readfilename)
readPcsv = pd.read_csv("/home/airflow/gcs/data/" + readfilename)
print("physi cal.csv : ", readPcsv)
print('Done!')
return readPcsv
dag_name = Variable.get("dag_name")
cfg = readYML()
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'cluster_name': cfg['configs']['cluster_name'],
}
# Instantiate a DAG
dag = DAG(dag_id='read_yml', default_args=default_args,
schedule_interval=timedelta(days=1))
# Creating Tasks
t1 = PythonOperator(task_id='Raw1', python_callable=print_PcsvData,
dag=dag)
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc_cluster',
project_id=cfg['configs']['project_id'],
cluster_name=cfg['configs']['cluster_name'],
num_workers=cfg['configs']['num_workers'],
zone=cfg['configs']['zone'],
master_machine_type=cfg['configs']['master_machine_type'],
worker_machine_type=cfg['configs']['worker_machine_type'],
dag=dag)
t1 >> create_dataproc_cluster
Upvotes: 2