Reputation: 13
We're running Airflow v1.9.0 and having problems with one DAG. This one SSHOperator operator DAG (called matching) starts at 0 6 * * * and usually completes within the hour. Once a month, we have a large data ingestion that causes this task to take up 7 hours. Unfortunately, when this happens, the DAG stops our other DAGS from starting until it completes. It is the only DAG running during this 7-hour period. This isn't normal behavior or our other DAGS (they continue to run and other DAGS will start). We could not find any table locks (PostgreSQL) that may be causing this issue.
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator
from datetime import datetime, timedelta
from os import path
PATH = '/path/to/code/'
default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2018, 1, 24),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
nightly_matching_command = f.read()
dag = DAG('matching',
template_searchpath = PATH,
schedule_interval = '0 6 * * *', #10:00PM
default_args = default_args)
nightly_matching = SSHOperator(task_id = 'nightly_matching',
ssh_conn_id = 'external_server_name',
command = nightly_matching_command,
do_xcom_push = True,
dag = dag)
Here are the query results when I look at the airflow database.
airflow=# select task_id, dag_id, start_date, execution_date,end_date, duration, state, pool, queue from task_instance where date_trunc('day', start_date)::DATE = '2018-05-09'::DATE order by start_date;
task_id: nightly_matching
dag_id: matching
start_date: 2018-05-09 06:00:04.486709
execution_date: 2018-05-08 06:00:00
end_date: 2018-05-09 12:50:52.509942
duration: 24648.023233
state: success
pool:
queue: default
This DAG that immediately follows the problem DAG is scheduled to start at 10:40.
task_id: Task1
dag_id: dag2
start_date: 2018-05-09 12:51:04.963004
execution_date: 2018-05-08 10:40:00
end_date: 2018-05-09 12:51:07.060369
duration: 2.097365
state: success
pool:
queue: default
run_nightly_matching.sh runs a Python script that uses psycopg2 to connect to our database and match pairs of tables.
*** Reading local log.
[2018-05-09 06:00:04,352] {cli.py:374} INFO - Running on host airflow
[2018-05-09 06:00:04,486] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,587] {models.py:1197} INFO - Dependencies all met for <TaskInstance: matching.nightly_matching 2018-05-08 06:00:00 [queued]>
[2018-05-09 06:00:04,588] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 2
--------------------------------------------------------------------------------
[2018-05-09 06:00:04,617] {models.py:1428} INFO - Executing <Task(SSHOperator): nightly_matching> on 2018-05-08 06:00:00
[2018-05-09 06:00:04,617] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run matching nightly_matching 2018-05-08T06:00:00 --job_id 274416 --raw -sd DAGS_FOLDER/matching.py']
[2018-05-09 06:00:04,981] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:04,980] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-05-09 06:00:05,023] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,022] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
[2018-05-09 06:00:05,140] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,139] {__init__.py:45} INFO - Using executor LocalExecutor
[2018-05-09 06:00:05,190] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,190] {models.py:189} INFO - Filling up the DagBag from /home/airflow/airflow/dags/matching.py
[2018-05-09 06:00:05,445] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,444] {base_hook.py:80} INFO - Using connection to: external_server_name
[2018-05-09 06:00:05,511] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,509] {transport.py:1687} INFO - Connected (version 2.0, client OpenSSH_6.6.1p1)
[2018-05-09 06:00:05,621] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 06:00:05,620] {transport.py:1687} INFO - Authentication (publickey) successful!
[2018-05-09 12:50:52,431] {base_task_runner.py:98} INFO - Subtask: [2018-05-09 12:50:52,430] {ssh_operator.py:113} INFO - Matching: ('table_Z', 'table_Y')
[2018-05-09 12:50:52,432] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:00:58.172398
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-08 23:17:19.273990
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_R')
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-08 23:17:19.274092
[2018-05-09 12:50:52,433] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 03:16:46.339119
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'table_Y')
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 03:16:46.339228
[2018-05-09 12:50:52,434] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 04:49:16.901285
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_X', 'clients')
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 04:49:16.901410
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:30.502418
[2018-05-09 12:50:52,435] {base_task_runner.py:98} INFO - Subtask: Matching: ('clients', 'table_R')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:30.502494
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:02:47.035880
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_Y', 'table_B')
[2018-05-09 12:50:52,436] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:02:47.035974
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:03:17.464931
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_E', 'table_Y')
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:03:17.465061
[2018-05-09 12:50:52,437] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:43:05.543177
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Matching: ('table_P', 'table_Y')
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Started at: 2018-05-09 05:43:05.543298
[2018-05-09 12:50:52,438] {base_task_runner.py:98} INFO - Subtask: Finished at: 2018-05-09 05:51:42.683216
Upvotes: 0
Views: 802
Reputation: 8239
I think this could be connected to the amount of executed tasks and to this bit:
with open(path.join(PATH, 'matching', 'run_nightly_matching.sh')) as f:
nightly_matching_command = f.read()
If I am not mistaken, this code bit could be executed far more often than it is planned since it is no part of a task and untriggerable. This will be executed every time the Python file itself will be executed. This might never happen or ever so often depending on your setup. Since there is a filehandle opened this might be connected to the problem. This might be nothing but could be worthwile to inspect.
Furthermore, I couldn't look into the cfg file since it was stored on Dropbox and I would have to login.
Here the amount of concurrent allowed threads could become interesting. Sounds like a blocking issue. It could also be that not the DAG itself is stopping the other tasks but load goes up high enough to consume too much load. Hard to say without specifics.
Upvotes: 1