Reputation: 5348
I am trying to clear a failed task so that it will run again.
I usually do this with the web GUI from the tree view
After selecting "Clear" I am directed to an error page:
The traceback on this page is the same error I receive when trying to clear this task using the CLI:
[u@airflow01 ~]# airflow clear -s 2002-07-29T20:25:00 -t
coverage_check gom_modis_aqua_coverage_check
[2018-01-16 16:21:04,235] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-01-16 16:21:05,192] {models.py:167} INFO - Filling up the DagBag from /root/airflow/dags
Traceback (most recent call last):
File "/usr/bin/airflow", line 28, in <module>
args.func(args)
File "/usr/lib/python3.4/site-packages/airflow/bin/cli.py", line 612, in clear
include_upstream=args.upstream,
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3173, in sub_dag
dag = copy.deepcopy(self)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 3159, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 166, in deepcopy
y = copier(memo)
File "/usr/lib/python3.4/site-packages/airflow/models.py", line 2202, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/usr/lib64/python3.4/copy.py", line 155, in deepcopy
y = copier(x, memo)
File "/usr/lib64/python3.4/copy.py", line 246, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "/usr/lib64/python3.4/copy.py", line 182, in deepcopy
y = _reconstruct(x, rv, 1, memo)
File "/usr/lib64/python3.4/copy.py", line 309, in _reconstruct
y.__dict__.update(state)
AttributeError: 'NoneType' object has no attribute 'update'
Looking for ideas on what may have caused this, what I should do to fix this task, and how to avoid this in the future.
I was able to work around the issue by deleting the task record using the "Browse > Task Instances" search, but would still like to explore the issue as I have seen this multiple times.
Although my DAG code is getting complicated, here is an excerpt from where the operator is defined within the dag:
trigger_granule_dag_id = 'trigger_' + process_pass_dag_name
coverage_check = BranchPythonOperator(
task_id='coverage_check',
python_callable=_coverage_check,
provide_context=True,
retries=10,
retry_delay=timedelta(hours=3),
queue=QUEUE.PYCMR,
op_kwargs={
'roi':region,
'success_branch_id': trigger_granule_dag_id
}
)
The full source code can be browsed at github/USF-IMARS/imars_dags. Here are links to the most relevant parts:
Upvotes: 6
Views: 2396
Reputation: 11
During some operations, Airflow deep copies some objects. Unfortunately, some objects do not allow this. The boto client is a good example of something that does not deep copies nicely, thread objects are another, but large objects with nested references like a reference to a parent task below can also cause issues.
In general, you do not want to instantiate a client in the dag code itself. That said, I do not think that it is your issue here. Though I do not have access to the pyCMR code to see if it could be an issue.
Upvotes: 1
Reputation: 939
Below is a sample DAG that I created to mimic the error that you are facing.
import logging
import os
from datetime import datetime, timedelta
import boto3
from airflow import DAG
from airflow import configuration as conf
from airflow.operators import ShortCircuitOperator, PythonOperator, DummyOperator
def athena_data_validation(**kwargs):
pass
start_date = datetime.now()
args = {
'owner': 'airflow',
'start_date': start_date,
'depends_on_past': False,
'wait_for_downstream': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=30)
}
dag_name = 'data_validation_dag'
schedule_interval = None
dag = DAG(
dag_id=dag_name,
default_args=args,
schedule_interval=schedule_interval)
athena_client = boto3.client('athena', region_name='us-west-2')
DAG_SCRIPTS_DIR = conf.get('core', 'DAGS_FOLDER') + "/data_validation/"
start_task = DummyOperator(task_id='Start_Task', dag=dag)
end_task = DummyOperator(task_id='End_Task', dag=dag)
data_validation_task = ShortCircuitOperator(
task_id='Data_Validation',
provide_context=True,
python_callable=athena_data_validation,
op_kwargs={
'athena_client': athena_client,
'sql_file': DAG_SCRIPTS_DIR + 'data_validation.sql',
's3_output_path': 's3://XXX/YYY/'
},
dag=dag)
data_validation_task.set_upstream(start_task)
data_validation_task.set_downstream(end_task)
After one successful run, I tried to clear the Data_Validation
task and got the same error (see below).
I removed the athena_client
object creation and placed it inside the athena_data_validation
function and then it worked. So when we do a clear
in Airflow UI, it tries to do a deepcopy
and get all the objects from previous run. I am still trying to understand why its not able to get a copy of the object
type but I got a workaround which was working for me.
Upvotes: 2