Nicolaz
Nicolaz

Reputation: 179

Airflow task randomly exited with return code 1 [Local Executor / PythonOperator]

To give some context, I am using Airflow 2.3.0 on Kubernetes with the Local Executor (which may sound weird, but it works for us for now) with one pod for the webserver and two for the scheduler.

I have a DAG consisting of a single task (PythonOperator) that makes many API calls (200K) using requests. Every 15 calls, the data is loaded in a DataFrame and stored on AWS S3 (using boto3) to reduce the RAM usage.

The problem is that I can't get to the end of this task because it goes into error randomly (after 1, 10 or 120 minutes).

I have made more than 50 tries, no success and the only logs on the task are:

[2022-09-01, 14:45:44 UTC] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: INGESTION-DAILY-dag.extract_task scheduled__2022-08-30T00:00:00+00:00 [queued]>
[2022-09-01, 14:45:44 UTC] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: INGESTION-DAILY-dag.extract_task scheduled__2022-08-30T00:00:00+00:00 [queued]>
[2022-09-01, 14:45:44 UTC] {taskinstance.py:1356} INFO - 
--------------------------------------------------------------------------------
[2022-09-01, 14:45:44 UTC] {taskinstance.py:1357} INFO - Starting attempt 23 of 24
[2022-09-01, 14:45:44 UTC] {taskinstance.py:1358} INFO - 
--------------------------------------------------------------------------------
[2022-09-01, 14:45:44 UTC] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): extract_task> on 2022-08-30 00:00:00+00:00
[2022-09-01, 14:45:44 UTC] {standard_task_runner.py:52} INFO - Started process 942 to run task
[2022-09-01, 14:45:44 UTC] {standard_task_runner.py:79} INFO - Running: ['airflow', 'tasks', 'run', 'INGESTION-DAILY-dag', 'extract_task', 'scheduled__2022-08-30T00:00:00+00:00', '--job-id', '4390', '--raw', '--subdir', 'DAGS_FOLDER/dags/ingestion/daily_dag/dag.py', '--cfg-path', '/tmp/tmpwxasaq93', '--error-file', '/tmp/tmpl7t_gd8e']
[2022-09-01, 14:45:44 UTC] {standard_task_runner.py:80} INFO - Job 4390: Subtask extract_task
[2022-09-01, 14:45:45 UTC] {task_command.py:369} INFO - Running <TaskInstance: INGESTION-DAILY-dag.extract_task scheduled__2022-08-30T00:00:00+00:00 [running]> on host 10.XX.XXX.XXX

[2022-09-01, 14:48:17 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2022-09-01, 14:48:17 UTC] {taskinstance.py:1395} INFO - Marking task as UP_FOR_RETRY. dag_id=INGESTION-DAILY-dag, task_id=extract_task, execution_date=20220830T000000, start_date=20220901T144544, end_date=20220901T144817
[2022-09-01, 14:48:17 UTC] {local_task_job.py:273} INFO - 0 downstream tasks scheduled from follow-on schedule check

But when I go to the pod logs, I get the following message:

[2022-09-01 14:06:31,624] {local_executor.py:128} ERROR - Failed to execute task an integer is required (got type ChunkedEncodingError).
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/local_executor.py", line 124, in _execute_work_in_fork
    args.func(args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 51, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 377, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 183, in _run_task_by_selected_method
    _run_task_by_local_task_job(args, ti)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 241, in _run_task_by_local_task_job
    run_job.run()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 244, in run
    self._execute()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/local_task_job.py", line 105, in _execute
    self.task_runner.start()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 41, in start
    self.process = self._start_by_fork()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 125, in _start_by_fork
    os._exit(return_code)
TypeError: an integer is required (got type ChunkedEncodingError)

What I find strange is that I never had this error on other DAGs (where tasks are smaller and faster). I checked, during an attempt, CPU and RAM usages are stable and low.

I have the same error locally, I also tried to upgrade to 2.3.4 but nothing works.

Do you have any idea how to fix this?

Thanks a lot! Nicolas

Upvotes: 1

Views: 1176

Answers (1)

Nicolaz
Nicolaz

Reputation: 179

As @EDG956 said, this is not an error from Airflow but from the code.

I solved it using a context manager (which was not enough) and recreating a session:

s = requests.Session()

while True:
    try:
        with s.get(base_url) as r:
            response = r
    except requests.exceptions.ChunkedEncodingError:
        s.close()
        s.requests.Session()
        response = s.get(base_url)

Upvotes: 0

Related Questions