Neel
Neel

Reputation: 43

Airflow UI cannot manually trigger a dag

Versions:

A simple hello world DAG won't run when manually triggered using the UI (button). Same example runs fine when run via command line. Would like to allow users to use the UI to trigger a job. Is this a bug.

Example Hello world DAG tested:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_welcome():
    return 'Welcome!'

dag = DAG('say_welcome', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

dummy_operator = DummyOperator(task_id='say_welcome_dummy_task', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='say_welcome_task', python_callable=print_welcome, dag=dag)

dummy_operator >> hello_operator

Test output from command line.

(airfow_v1_venv) sshuser@ed41-kp06sp:~/airflowv1/dags$ airflow trigger_dag say_welcome
[2018-12-03 19:38:34,679] {__init__.py:51} INFO - Using executor LocalExecutor
[2018-12-03 19:38:34,956] {models.py:271} INFO - Filling up the DagBag from /home/sshuser/airflowv1/dags
[2018-12-03 19:38:35,071] {cli.py:241} INFO - Created <DagRun say_welcome @ 2018-12-03 19:38:34+00:00: manual__2018-12-03T19:38:34+00:00, externally triggered: True>

Logs when triggered using UI

    context)
  File "/home/sshuser/airfow_v1_venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
    cursor.executemany(statement, parameters)
IntegrityError: (pyodbc.IntegrityError) ('23000', u"[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Violation of PRIMARY KEY constraint 'PK__task_ins__9BEABD04E2A8D429'. Cannot insert duplicate key in object 'dbo.task_instance'. The duplicate key value is (say_welcome_task, say_welcome, Dec  3 2018  7:40PM). (2627) (SQLExecDirectW)") [SQL: u'INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'] [parameters: (('say_welcome_task', 'say_welcome', datetime.datetime(2018, 12, 3, 19, 40, 9, 787000, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 0, u'', 'sshuser', None, None, 'default', 1, None, None, None, bytearray(b'\x80\x02}q\x00.')), ('say_welcome_dummy_task', 'say_welcome', datetime.datetime(2018, 12, 3, 19, 40, 9, 787000, tzinfo=<Timezone [UTC]>), None, None, None, None, 0, 3, u'', 'sshuser', None, None, 'default', 2, None, None, None, bytearray(b'\x80\x02}q\x00.')))]

Upvotes: 0

Views: 3234

Answers (1)

Meghdeep Ray
Meghdeep Ray

Reputation: 5537

Looks to me like you tried to override the old say_welcome DAG.

Create one called say_welcome_v1 and give it a shot.

When a new DAG is created, you have to change it's name so that it can be distinguished in the meta DB. Hence the convention of using _v1, _v2, etc at the end of the DAG name each time there are changes in the DAG.

Since the error you are getting is an integrity error, which occurs when you're trying to insert something into the DB with the same primary key as something else that is already there. Most likely an error from a new DAG with the same name as an old one.

If you don't have any old DAG runs whose history/logs are worth keeping, you can just use airflow resetdb followed by airflow initdb to reset your database and start from scratch.

You can also delete the old DAG ID form the meta DB using airflow delete_dag my_dag_id as of Airflow version 1.10 and above.

Upvotes: 1

Related Questions