Reputation: 2421
I am using the PythonOperator to call a function that parallelizes data engineering process as an Airflow task. This is done simply by wrapping a simple function with a callable wrapper function called by Airflow.
def wrapper(ds, **kwargs):
process_data()
process_data achieves parallelization using the multiprocessing module that spawns subprocesses. When I run process_data all by itself from jupyter notebook, it runs to the end with no problem. However when I run it using Airflow, the task fails and the log for the task shows something like this.
[2019-01-22 17:16:46,966] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,969] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-129:
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
raise AirflowException("Task received SIGTERM signal")
[2019-01-22 17:16:46,973] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 17:16:46,993] {models.py:1610} ERROR - Received SIGTERM. Terminating subprocesses.
[2019-01-22 17:16:46,996] {logging_mixin.py:95} WARNING - Process ForkPoolWorker-133:
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - Traceback (most recent call last):
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
self.run()
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/pool.py", line 108, in worker
task = get()
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/queues.py", line 343, in get
res = self._reader.recv_bytes()
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/usr/lib/python3.5/multiprocessing/synchronize.py", line 99, in __exit__
return self._semlock.__exit__(*args)
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - File "/home/airflow/.env/lib/python3.5/site-packages/airflow/models.py", line 1612, in signal_handler
raise AirflowException("Task received SIGTERM signal")
[2019-01-22 17:16:46,999] {logging_mixin.py:95} WARNING - airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 17:16:47,086] {logging_mixin.py:95} INFO - file parsing and processing 256.07
[2019-01-22 17:17:12,938] {logging_mixin.py:95} INFO - combining and sorting 25.85
I am not quite sure why the task receives SIGTERM. My guess is that some higher level process is sending those to the subprocesses. What should I do to debug this issue?
Just noticed that towards the end of the log for the task, it clearly states that
airflow.exceptions.AirflowException: Task received SIGTERM signal
[2019-01-22 12:31:39,196] {models.py:1764} INFO - Marking task as FAILED.
Upvotes: 36
Views: 75297
Reputation: 39790
I was having the same problem after upgrading from Airflow v1.10.15
to v2.2.5
and was seeing the error in long-running DAGs having a fairly high number of tasks.
Apparently, the dagrun_timeout
in airflow.models.DAG
was not respected in earlier Airflow versions so I noticed that the DAGs I was trying to migrate to the new Airflow instance were running for much longer than the specified dagrun_timeout
.
The solution for me was to increase the dagrun_timeout
(e.g. dagrun_timeout=datetime.timedelta(minutes=120)
).
Note that this variable is effective only for scheduled tasks (in other words with DAGs with a specified schedule_interval
).
Upvotes: 0
Reputation: 401
This might help if you use a Mac (Not sure if on all versions though. Right now it is MacOS 12.1, Monterey, Python 3.8).
I tried some of the solutions above, to no avail. Then I noticed in the log for the failed task that I was getting the warning:
Then immediately afterwards the SIGTERM signal being sent by the scheduler:
In the airflow.cfg, the python function socket.getfqdn is used by default to infer the hostname (look for line hostname_callable = socket.getfqdn). I did some digging (here), and turns out this function behaves somewhat differently in Mac.
I replaced the line to use gethostname (hostname_callable = socket.gethostname), restarted the scheduler, et voilá !
In retrospect, the scheduler was trying to contact the task to check its health state (as it's what a well behaved scheduler usually does), but could not contact it with the originally inferred hostname. It would therefore assume unhealthy state, and send the kill signal...
Upvotes: 2
Reputation: 321
DAGs that run longer or have more than 10-12 tasks (that may run long) seem to have a higher probability of getting SIGTERM.
PROBABLE SOLUTIONS:
schedule_after_task_execution to False
If True, Task supervisor process runs a "mini scheduler". However, after marking it as success, it will call _run_mini_scheduler_on_child_tasks. And while local_task_job.py will detect the task as success and not running, it will terminate the process which might be still executing under _run_mini_scheduler_on_child_tasks.
The problem may also be because the scheduler health check threshold which is set to be smaller than the scheduler heartbeat interval.
The default scheduler_health_check_threshold
was 30 seconds and scheduler_heartbeat_sec
was 60 seconds. During the check for orphaned tasks, the scheduler heartbeat was determined to be older than 30 seconds, which makes sense, because it was only heartbeating every 60 seconds. Thus the scheduler was inferred to be unhealthy and was therefore maybe terminated.
The logs in my case shows that the SIGTERM is associated with tasks being considered as orphaned.
As the the timestamps coincide closely with the SIGTERM received by the task. It seems that since the SchedulerJob was marked as failed, then the TaskInstance running the actual task was considered an orphan, and thus marked for termination. I changes these values as
scheduler_heartbeat_sec
= 60
scheduler_health_check_threshold
= 120 (twice that of scheduler_heartbeat_sec
)
Some explanations blamed metadata database being used as 100% cpu while running dags with multiple tasks in parallel, such as the example used in my case.
Increasing the database may solve the issue, but instead increasing the job_heartbeat_sec
and the other three configurations actually solved the problem .
job_heartbeat_sec = 60
(roughly based on the time almost taken by the task
Upvotes: 1
Reputation: 116
Probably this is way overdue but what solved my issue was increasing the dagrun_timeout
parameter. You should check if your execution fits the window.
Upvotes: 0
Reputation: 156
I followed the answer here. The idea is the same: Not letting Airflow close threads too early:
export AIRFLOW__CORE__KILLED_TASK_CLEANUP_TIME=604800
did the trick.
Upvotes: 12
Reputation: 85
I too had similar issue when I was running Python multithreaded code. I was able to resolve the same by joining threads. Airflow then waits till all thread gets executed before sending SIGTERM.
threads = [] #array for threads
t = Thread(...)
threads.append(t) #add all threads
# Start all threads
for x in threads:
x.start()
# Wait for all of them to finish
for x in threads:
x.join()
Upvotes: 1