hong
hong

Reputation: 31

airflow 1.10.0 branchpythonoperator run failed : Celery command failed

I copy airflow dag example example_branch_dop_operator_v3 code to my own dag test1_v2 , I can run example_branch_dop_operator_v3 successfuly , but run test1_v2 failed. dag test1_v2 code (AIRFLOW_HOME/dags/test1.py):

import airflow
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'depends_on_past': True,
}

dag = DAG(dag_id='test1_v2'
          schedule_interval='*/1 * * * *', default_args=args)


def should_run(ds, **kwargs):

    print('------------- exec dttm = {} and minute = {}'.
          format(kwargs['execution_date'], kwargs['execution_date'].minute))
    if kwargs['execution_date'].minute % 2 == 0:
        return "oper_1"
    else:
        return "oper_2"


cond = BranchPythonOperator(
    task_id='condition',
    provide_context=True,
    python_callable=should_run,
    dag=dag)

oper_1 = DummyOperator(
    task_id='oper_1',
    dag=dag)
oper_1.set_upstream(cond)

oper_2 = DummyOperator(
    task_id='oper_2',
    dag=dag)
oper_2.set_upstream(cond)

command airflow run test1_v2 condition "2018-09-01 00:00:00" , there are worker log:

[2018-10-11 21:20:29,991] {cli.py:492} INFO - Running <TaskInstance: test1_v2.condition 2018-09-01T00:00:00+08:00 [queued]> on host CenT  
[2018-10-11 21:23:10,879] {settings.py:174} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800  
[2018-10-11 21:23:11,343] {__init__.py:51} INFO - Using executor CeleryExecutor  
[2018-10-11 21:23:11,572] {cli.py:478} INFO - Loading pickle id 26  
Traceback (most recent call last):  
  File "/home/airflow/airflow/venv/bin/airflow", line 32, in <module>  
    args.func(args)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/utils/cli.py", line 74, in wrapper  
    return f(*args, **kwargs)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/bin/cli.py", line 480, in run  
    DagPickle).filter(DagPickle.id == args.pickle).first()  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py", line 2755, in first  
    ret = list(self[0:1])  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/query.py", line 2547, in __getitem__  
    return list(res)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line 90, in instances  
    util.raise_from_cause(err)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause  
    reraise(type(exception), exception, tb=exc_tb, cause=cause)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/util/compat.py", line 187, in reraise  
    raise value  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line 75, in instances  
    rows = [proc(row) for row in fetch]  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line 75, in <listcomp>  
    rows = [proc(row) for row in fetch]  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line 452, in _instance  
    loaded_instance, populate_existing, populators)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/orm/loading.py", line 513, in _populate_full  
    dict_[key] = getter(row)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/sqlalchemy/sql/sqltypes.py", line 1540, in process  
    return loads(value)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", line 316, in loads  
    return load(file, ignore)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", line 304, in load  
    obj = pik.load()  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/dill/_dill.py", line 465, in find_class  
    return StockUnpickler.find_class(self, module, name)  
ImportError: No module named 'unusual_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1'  
[2018-10-11 21:23:11,823: ERROR/ForkPoolWorker-6] execute_command encountered a CalledProcessError  
Traceback (most recent call last):  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", line 60, in execute_command  
    close_fds=True, env=env)  
  File "/data/python35/lib/python3.5/subprocess.py", line 271, in check_call  
    raise CalledProcessError(retcode, cmd)  
subprocess.CalledProcessError: Command 'airflow run test1_v1 condition 2018-09-01T10:00:00+08:00 --pickle 26 --local' returned non-zero exit status 1  
[2018-10-11 21:23:11,895: ERROR/ForkPoolWorker-6] None  
[2018-10-11 21:23:12,103: ERROR/ForkPoolWorker-6] Task airflow.executors.celery_executor.execute_command[efb4ef09-bdf8-4123-85c8-4dc73dc19d74] raised unexpected: AirflowException('Celery command failed',)  
Traceback (most recent call last):  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", line 60, in execute_command  
    close_fds=True, env=env)  
  File "/data/python35/lib/python3.5/subprocess.py", line 271, in check_call  
    raise CalledProcessError(retcode, cmd)  
subprocess.CalledProcessError: Command 'airflow run test1_v1 condition 2018-09-01T10:00:00+08:00 --pickle 26 --local' returned non-zero exit status 1  
  
During handling of the above exception, another exception occurred:  
  
Traceback (most recent call last):  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py", line 375, in trace_task  
    R = retval = fun(*args, **kwargs)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/celery/app/trace.py", line 632, in __protected_call__  
    return self.run(*args, **kwargs)  
  File "/home/airflow/airflow/venv/lib/python3.5/site-packages/airflow/executors/celery_executor.py", line 65, in execute_command  
    raise AirflowException('Celery command failed')  
airflow.exceptions.AirflowException: Celery command failed   

Why dag test2_v1 can failed? thanks.

Upvotes: 0

Views: 1555

Answers (1)

hong
hong

Reputation: 31

When i use python_callable=range to replace python_callable=should_run,run this dag successfuly,so i guess the reason is airflow can not find should_run, as it is show in log ImportError: No module named 'unusual_prefix_d47cb71ac291be245f60c8ac0070d906f4627fa1_test1'

The Solution is :

  • if you use command you should use airflow backfill test1_v2 -s 20180901 -e 20180902 -x documentation
  • There is no such problem in the case of airflow scheduler triggering

Upvotes: 1

Related Questions