Reputation: 28795
I have a function which gets a list of tables from a BigQuery dataset:
def get_table_names(**kwargs):
client = bigquery.Client()
# get source tables
source_tables = []
for table in client.list_tables(
Template('$project.$dataset').substitute(project=SOURCE_PROJECT, dataset=SOURCE_DATASET)):
if table.table_id.startswith(TABLE_PREFIX):
source_tables.append(table.table_id)
logging.info(str(len(source_tables)) + ' tables scheduled to move')
return source_tables
I was initially calling this function within a task of type PythonOperator
and - although I wasn't returning a value - it ran fine, and logged out '524 tables scheduled to move'.
I'm now calling it as part of the dag setup so that I can instantiate tasks for each table (I've not written this part yet):
table_tasks = get_table_names()
But as soon as I call it, the Composer/Airflow web interface stops recognising the DAG - it's still listed and if I click the reload icon I get the usual 'fresh as a daisy message', but if I try to go into the DAG I get:
DAG "GA360_Replication" seems to be missing
Upvotes: 1
Views: 1322
Reputation: 2670
The most likely reason for the DAG to be missing is an error in the code that makes the scheduler not able to pick the DAG up. Also you could check if there are 2 .py files with the same DAG name. I have also seen this happening when you replace you upload a .py file with different name but same DAG name (even if you delete the previous .py file). This is hard to troubleshoot w/o checking the environment/logs, but I think those are the most likely scenarios. Feel free to contact support if you still get this issue.
Either way, I made this DAG that works fine in Composer 1.7.1 Airflow 1.10.2 and Python3. Reading the question and code, it feels that you want to pass the list to tables to the next task, so I added one that simply prints them by using XCOM:
import datetime import os import airflow from airflow import models from airflow.operators import python_operator from google.cloud import bigquery import time import loggingdefault_dag_args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(1) }
with models.DAG( 'test_table_xcom', default_args=default_dag_args, schedule_interval = "@daily") as dag:
TABLE_PREFIX = 'test' SOURCE_PROJECT = <PROJECT> SOURCE_DATASET = <DATASET> def get_table_names(**kwargs): client = bigquery.Client() source_tables = [] dataset = '{}.{}'.format(SOURCE_PROJECT,SOURCE_DATASET) for table in client.list_tables(dataset): if table.table_id.startswith(TABLE_PREFIX): source_tables.append(table.table_id) logging.info('{} tables scheduled to move'.format(len(source_tables))) return source_tables def print_tables(**kwargs): ti = kwargs['ti'] tables_list = ti.xcom_pull(task_ids='list_tables') for table in tables_list: print(table) listTables = python_operator.PythonOperator(task_id='list_tables',python_callable=get_table_names, provide_context=True) tablePrint = python_operator.PythonOperator(task_id='print_tables',python_callable=print_tables, provide_context=True) listTables >> tablePrint
Last but not least, note that Airflow is not meant for making ETL operations by itself but to schedule them. Using XCOM is not recommended (as documented *) since it may overload the DB (in this case Cloud SQL) that runs under the hood of Airflow/Composer. For this particular case that you'd be transferring a list of tables name, I don't think it will be a problem, but it's better to be aware of this recommendation.
*if two operators need to share information, like a filename or small amount of data, you should consider combining them into a single operator.
Upvotes: 1