Reputation: 97
Was attempting to add a new DAG to our Google Cloud Composer instance - we have 32+ DAGs currently - and doing the usual things in https://cloud.google.com/composer/docs/how-to/using/managing-dags doesn't appear to be having any effect - we can't see these DAGs in the webserver/UI and I don't see that they are necessarily being loaded. I do see them being copied to the appropriate bucket in the logs but nothing beyond that.
I even tried setting a dummy environment variable to kick off a full restart of the Composer instance but to no avail.
Finally I've put together an entirely stripped down DAG and attempted to add it. Here is the DAG:
from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
dag = models.Dag(
dag_id="test-dag",
schedule_interval=None,
start_date=datetime(2020, 3, 9),
max_active_runs=1,
catchup=False,
)
task_test = DummyOperator(dag=dag, task_id="test-task")
Even this simple DAG isn't getting picked up so I'm wondering what I can try next. I looked through https://github.com/apache/airflow/blob/master/airflow/config_templates/default_airflow.cfg in an effort to see if perhaps there was anything I might tweak in here in terms of DagBag loading time limits, etc. but nothing jumps off. Totally stumped here.
Upvotes: 4
Views: 2917
Reputation: 97
We eventually figured out the issue:
dag = models.Dag(
should be
dag = models.DAG(
Upvotes: 1
Reputation: 2094
Your example is not picked up by my environment either. However, I've tried with the following format and was picked up without issues:
from airflow import DAG
from datetime import datetime
from airflow.contrib.operators import kubernetes_pod_operator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
with DAG(
"my-test-dag",
schedule_interval=None,
start_date=datetime(2020, 3, 9),
max_active_runs=1,
catchup=False) as dag:
task_test = DummyOperator(dag=dag, task_id="my-test-task")
Upvotes: 2