Tomasz Krol
Tomasz Krol

Reputation: 668

Airflow backfill job failing even though test works fine

I am trying to execute DAG with just one PythonOperator. When I try test it works fine, also when I tried on the Airflow without CeleryExecutor also worked fine.

But fails with no really descriptive error when I try to backfill it on Airflow that is running with CeleryExecutor:

airflow@ip:/home/admin$ airflow backfill REDSHIFT3 -s 2017-05-10
[2017-05-22 14:41:14,373] {__init__.py:57} INFO - Using executor CeleryExecutor
[2017-05-22 14:41:14,432] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2017-05-22 14:41:14,452] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2017-05-22 14:41:14,616] {models.py:167} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2017-05-22 14:41:14,994] {models.py:1126} INFO - Dependencies all met for <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [scheduled]>
[2017-05-22 14:41:15,000] {base_executor.py:50} INFO - Adding to queue: airflow run REDSHIFT3 get_data_redshift 2017-05-10T00:00:00 --pickle 81 --local
[2017-05-22 14:41:19,893] {celery_executor.py:78} INFO - [celery] queuing (u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0)) through celery, queue=default
[2017-05-22 14:41:20,598] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:20,607] {jobs.py:1978} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2017-05-22 14:41:24,954] {jobs.py:1725} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:24,954] {models.py:1417} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
None
[2017-05-22 14:41:24,954] {models.py:1441} INFO - Marking task as FAILED.
[2017-05-22 14:41:25,037] {models.py:1462} ERROR - Executor reports task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?
[2017-05-22 14:41:25,042] {jobs.py:1690} ERROR - Task instance <TaskInstance: REDSHIFT3.get_data_redshift 2017-05-10 00:00:00 [failed]> failed
[2017-05-22 14:41:25,044] {models.py:4024} INFO - Updating state for <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> considering 1 task(s)
[2017-05-22 14:41:25,047] {models.py:4064} INFO - Marking run <DagRun REDSHIFT3 @ 2017-05-10 00:00:00: backfill_2017-05-10T00:00:00, externally triggered: False> failed
[2017-05-22 14:41:25,087] {jobs.py:1978} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 0 | kicked_off: 0 | failed: 1 | skipped: 0 | deadlocked: 0 | not ready: 0
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 167, in backfill
    pool=args.pool)
  File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 3330, in run
    job.run()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 200, in run
    self._execute()
  File "/usr/local/lib/python2.7/dist-packages/airflow/jobs.py", line 2021, in _execute
    raise AirflowException(err)
airflow.exceptions.AirflowException: ---------------------------------------------------
Some task instances failed:
set([(u'REDSHIFT3', u'get_data_redshift', datetime.datetime(2017, 5, 10, 0, 0))])

Here is the DAG that I am trying to execute:

from __future__ import print_function    
from builtins import range    
import airflow    
from pprint import pprint    
from airflow.operators.bash_operator import BashOperator    
from airflow.hooks.postgres_hook import PostgresHook    
from airflow.operators.python_operator import PythonOperator    
from airflow.models import DAG

import time
from pprint import pprint

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(
dag_id='REDSHIFT3', default_args=args,
    schedule_interval=None)

def get_data(ds, **kwargs):
    pprint(kwargs)

run_this = PythonOperator(
    task_id='get_data_redshift',
    provide_context=True,
    python_callable=get_data,
    dag=dag)    

Upvotes: 3

Views: 3626

Answers (1)

c-wilson
c-wilson

Reputation: 405

Hey I had a related problem - same error, but not while backfilling. When my cluster was under sustained heavy load (>50 workers, 100s of tasks running concurrently), my database was hitting max CPU use.

For me this was due to my burstable (t2) RDS instance running out of CPU credits and throttling. Provisioning a bigger instance type resolved this issue for me.

Even if you're not on AWS, I would double check that your database is not maxing out some resource constraint like CPU or I/O. I'm guessing that this causes a race condition where the scheduler tries to change the TaskInstance's state to QUEUED and sends the task message to the message queue before the database actually commits the state change. Hope that helps someone out there.

Upvotes: 1

Related Questions