Amy Gao
Amy Gao

Reputation: 11

"Dag Seems to be missing" error in a Cloud Composer Airflow Dynamic DAG

I have a dynamic Airflow DAG in Google Cloud Composer gets created, listed in the web-server and ran (backfill) without error. However, there are issues:

  1. When clicking on the DAG in web url, it says "DAG seems to be missing"
  2. Can't see Graph view/Tree view as showing the error above
  3. Can't manually trigger the DAG as showing the error above

Trying to fix this for couple days...any hint will be helpful. Thank you!

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from google.cloud import storage
from airflow.models import Variable
import json


args = {
     'owner': 'xxx',
     'start_date':'2020-11-5',
     'provide_context': True
    }


dag = DAG(
    dag_id='dynamic',
    default_args=args
    )


def return_bucket_files(bucket_name='xxxxx', **kwargs):
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blobs = bucket.list_blobs()

    file_list = [blob.name for blob in blobs]

    return file_list


def dynamic_gcs_to_gbq_etl(file, **kwargs):

    mapping = json.loads(Variable.get("xxxxx"))
    database = mapping[0][file]
    table = mapping[1][file]

    task=GoogleCloudStorageToBigQueryOperator(
        task_id= f'gcs_load_{file}_to_gbq',
        bucket='xxxxxxx',
        source_objects=[f'{file}'],
        destination_project_dataset_table=f'xxx.{database}.{table}',
        write_disposition="WRITE_TRUNCATE",
        autodetect=True,
        skip_leading_rows=1,
        source_format='CSV',
        dag=dag)

    return task


start_task = DummyOperator(
    task_id='start',
    dag=dag
)


end_task = DummyOperator(
    task_id='end',
    dag=dag)


push_bucket_files = PythonOperator(
        task_id="return_bucket_files",
        provide_context=True,
        python_callable=return_bucket_files,
        dag=dag)


for file in return_bucket_files():
    gcs_load_task = dynamic_gcs_to_gbq_etl(file)
    start_task >> push_bucket_files >> gcs_load_task >> end_task

Upvotes: 1

Views: 1743

Answers (1)

Enrique Zetina
Enrique Zetina

Reputation: 835

This issue means that the Web Server is failing to fill in the DAG bag on its side - this problem is most likely not with your DAG specifically.

My suggestion would be right now to try and restart the web server (via the installation of some dummy package).

Similar issues reported in this post as well here.

Upvotes: 0

Related Questions