Reputation: 17
I am using airflow version 2.4.3 composer version 2.1.11
# this function creates a dictionary of sql file names and file paths from google cloud
def _file_dict(*op_args):
bucket=op_args[0]
path=op_args[1]
from google.cloud import storage
client = storage.Client()
blobs = client.list_blobs(bucket, prefix=path)
my_dict={}
for blob in blobs:
if(blob.name.endswith('.sql')):
my_dict[blob.name]=blob.path
print(my_dict.items())
return my_dict
# and this function creates a bigquery task for each sql file
def _sql_file(file_nm:str, file_path:str, dag=None):
hook = GCSHook()
print(f'bucket name: {dag_bucket}')
print(f'sql file name: {file_nm}')
print(f'sql file path: {file_path}')
try:
object_name = f'{file_path}/{file_nm}'
resp_byte = hook.download_as_byte_array(
bucket_name = dag_bucket,
object_name = object_name,
)
except:
e = sys.exc_info()
print(e)
sys.exit(1)
resp_string = resp_byte.decode("utf-8")
return BigQueryInsertJobOperator(
task_id=f'create_view_{file_nm}',
configuration={
"query": {
"query": f'{resp_string}',
"useLegacySql": False,
}
},
location='US',
dag=dag
)
dag = DAG(dag_name,
default_args=default_args,
catchup=True,
max_active_runs=1,
schedule_interval=schedule_interval,
description=dag_description,
render_template_as_native_obj=True,
)
make_file_dict = PythonOperator(
task_id='make_file_dict',
provide_context=True,
python_callable=_file_dict,
op_args=[ (dag_bucket),('view_definitions') ],
dag=dag
)
file_dict = "{{ task_instance.xcom_pull(task_ids='make_file_dict') }}"
run_sql_from_file = [
_sql_file(file_nm=k, file_path=v, dag=dag)
for k,v in file_dict.items()
]
make_file_dict>>run_sql_from_file
I was expecting the xcom_pull to result in a dictionary because render_template_as_native_obj is true. There are aspects of airflow/python that I don't understand yet (namely decorators and jinja templates) and I'm clearly doing something wrong, but I'm not sure how to make this work.
If I create a dictionary of hardcoded the values, it works fine. I'd rather not do that for 2 reasons: 1) I want the job to automatically pick up any .sql files that are pushed into the repo and 2) there are a lot of files.
Any help is appreciated.
the error is
Broken DAG: [/home/airflow/gcs/dags/basic_view_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/airflow/gcs/dags/basic_view_dag.py", line 165, in <module>
for k,v in file_dict.items()
AttributeError: 'str' object has no attribute 'items'
I have tried and failed with the following variations
ast.literal_eval("{{ task_instance.xcom_pull(task_ids='make_file_dict') }}")
json.loads("{{ task_instance.xcom_pull(task_ids='make_file_dict') }}")
def str_to_dict(string):
string = string.strip('{}')
string = string.strip('[]')
pairs = string.split(', ')
return {key[1:-2]: int(value) for key, value in (pair.split(': ') for pair in pairs)}
Upvotes: -1
Views: 4462
Reputation: 1
You're not using the file_dict
properly. Try passing that as op_args
or op_kwargs
to your ptyhon_callable. That'll make airflow render the jinja template and pass it as a literal to the function.
Upvotes: 0