Tom Hood
Tom Hood

Reputation: 537

Apache Airflow Timeout error when dynamically creating tasks in DAG

In my old DAG, I created tasks like so:

    start_task = DummyOperator(task_id = "start_task")

    t1 = PythonOperator(task_id = "t1", python_callable = get_t1)
    t2 = PythonOperator(task_id = "t2", python_callable = get_t2)
    t3 = PythonOperator(task_id = "t3", python_callable = get_t3)
    t4 = PythonOperator(task_id = "t4", python_callable = get_t4)
    t5 = PythonOperator(task_id = "t5", python_callable = get_t5)
    t6 = PythonOperator(task_id = "t6", python_callable = get_t6)
    t7 = PythonOperator(task_id = "t7", python_callable = get_t7)
    t8 = PythonOperator(task_id = "t8", python_callable = get_t8)
    t9 = PythonOperator(task_id = "t9", python_callable = get_t9)
    t10 = PythonOperator(task_id = "t10", python_callable = get_t10)
    t11 = PythonOperator(task_id = "t11", python_callable = get_t11)

    end_task = DummyOperator(task_id = "end_task")

    start_task >> [t1, t2, t3, t4, t5, t6, t7, t8, t9, t10, t11] >> end_task

Each of these tasks runs a different query, and each task is run concurrently. I have revised my code because much of it was redundant and could be put inside functions. In my new code, I also attempted to create tasks dynamically by reading in the queries and metadata for each task from a .json.

New Code:

    loaded_info = load_info()  # function call to load .json data into a list
    start_task = DummyOperator(task_id = "start_task")
    end_task = DummyOperator(task_id = "end_task")
    tasks = []  # empty list to append tasks to in for loop
    for x in loaded_info:
        qce = QCError(**x)
        id = qce.column
        task = PythonOperator(task_id = id, python_callable = create_task(qce))
        tasks.append(task)
    start_task >> tasks >> end_task

This new code appears fine, however it prevents my from running airflow initdb. After running the command, the terminal will wait and never finish until I finally CRTL+C to kill it, then eventually gives me an error after kill:

raise AirflowTaskTimeout(self.error_message)
pandas.io.sql.DatabaseError: Execution failed on sql 'select ..., count(*) as frequency from ... where ... <> all (array['...', '...', etc.]) or ... is null group by ... order by ... asc': Timeout, PID: 315

(Note: the query in the error statement above is just the first query in the .json). Considering I never had this error with the old DAG, I'm assuming this is due to the dynamic task creation, but I need help identifying what exactly is causing this error.

What I have tried:

Upvotes: 0

Views: 7215

Answers (1)

Tom Hood
Tom Hood

Reputation: 537

I managed to get airflow initdb to run finally (but I have not yet tested my job, and will update on its status later).

It turns out that when defining a python operator, you cannot include an argument like I was doing:

 task = PythonOperator(task_id = id, python_callable = create_task(qce))

Passing qce into create_tasks is what was causing the error. To pass arguments into your tasks, see here.

For those of you who want to see the fix for my exact case, I have this:

with DAG("dva_event_analysis_dag", default_args = DEFAULT_ARGS, schedule_interval = None, catchup = False) as dag:
    loaded_info = load_info()
    start_task = DummyOperator(task_id = "start_task")
    end_task = DummyOperator(task_id = "end_task")
    tasks = []
    for x in loaded_info:
        id = x["column"]
        task = PythonOperator(task_id = id, provide_context = True, python_callable = create_task, op_kwargs = x)
        tasks.append(task)
    start_task >> tasks >> end_task

Update (7/03/2019): Job status is successful. This was indeed the fix to my error. Hopefully this helps out others with similar issues.

Upvotes: 1

Related Questions