7yl4r
7yl4r

Reputation: 5348

Ooops... AttributeError when clearing failed task state in airflow

I am trying to clear a failed task so that it will run again.

I usually do this with the web GUI from the tree view

tree view showing failed task & clear popup

After selecting "Clear" I am directed to an error page:

error page

The traceback on this page is the same error I receive when trying to clear this task using the CLI:

[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t 
coverage_check  gom_modis_aqua_coverage_check 
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
  File "/usr/bin/airflow", line 28, in <module>
    args.func(args)
  File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
    include_upstream=args.upstream,
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
    dag = copy.deepcopy(self)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
    y = copier(memo)
  File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
    setattr(result, k, copy.deepcopy(v, memo))
  File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
    y = copier(x, memo)
  File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
    y[deepcopy(key, memo)] = deepcopy(value, memo)
  File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
    y = _reconstruct(x, rv, 1, memo)
  File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
    y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'

Looking for ideas on what may have caused this, what I should do to fix this task, and how to avoid this in the future.

I was able to work around the issue by deleting the task record using the "Browse > Task Instances" search, but would still like to explore the issue as I have seen this multiple times.

Although my DAG code is getting complicated, here is an excerpt from where the operator is defined within the dag:

    trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
    coverage_check = BranchPythonOperator(
        task_id='coverage_check',
        python_callable=_coverage_check,
        provide_context=True,
        retries=10,
        retry_delay=timedelta(hours=3),
        queue=QUEUE.PYCMR,
        op_kwargs={
            'roi':region,
            'success_branch_id': trigger_granule_dag_id
        }
    )

The full source code can be browsed at github/USF-IMARS/imars_dags. Here are links to the most relevant parts:

Upvotes: 6

Views: 2396

Answers (2)

artwr
artwr

Reputation: 11

During some operations, Airflow deep copies some objects. Unfortunately, some objects do not allow this. The boto client is a good example of something that does not deep copies nicely, thread objects are another, but large objects with nested references like a reference to a parent task below can also cause issues.

In general, you do not want to instantiate a client in the dag code itself. That said, I do not think that it is your issue here. Though I do not have access to the pyCMR code to see if it could be an issue.

Upvotes: 1

Sai Neelakantam
Sai Neelakantam

Reputation: 939

Below is a sample DAG that I created to mimic the error that you are facing.

import logging
import os
from datetime import datetime, timedelta

import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator


def athena_data_validation(**kwargs):
    pass


start_date = datetime.now()

args = {
    'owner': 'airflow',
    'start_date': start_date,
    'depends_on_past': False,
    'wait_for_downstream': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=30)
}

dag_name = 'data_validation_dag'

schedule_interval = None  

dag = DAG(
    dag_id=dag_name,
    default_args=args,
    schedule_interval=schedule_interval)

athena_client = boto3.client('athena', region_name='us-west-2')

DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"

start_task = DummyOperator(task_id='Start_Task', dag=dag)

end_task = DummyOperator(task_id='End_Task', dag=dag)

data_validation_task = ShortCircuitOperator(
    task_id='Data_Validation',
    provide_context=True,
    python_callable=athena_data_validation,
    op_kwargs={
        'athena_client': athena_client,
        'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
        's3_output_path': 's3://XXX/YYY/'
    },
    dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)

After one successful run, I tried to clear the Data_Validation task and got the same error (see below). enter image description here enter image description here

I removed the athena_client object creation and placed it inside the athena_data_validation function and then it worked. So when we do a clear in Airflow UI, it tries to do a deepcopy and get all the objects from previous run. I am still trying to understand why its not able to get a copy of the object type but I got a workaround which was working for me.

Upvotes: 2

Related Questions