Reputation: 385
In Airflow, I use an SSHOperator to call an API that works on some automation work. The work ran successfully and the report did generate, but Airflow returns the task failed due to the Socket exception.
This error sometimes occurs, and I would like to know the reason that caused it.
The error message received:
[2021-07-20 08:00:07,345] {ssh.py:109} INFO - Running command: curl -u <user:pw> <URL>
[2021-07-20 08:00:07,414] {ssh.py:145} WARNING - % Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0
[2021-07-20 08:00:08,420] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:01 --:--:-- 0
[2021-07-20 08:00:09,421] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:02 --:--:-- 0
[2021-07-20 08:00:10,423] {ssh.py:145} WARNING -
0 0 0 0 0 0 0 0 --:--:-- 0:00:03 --:--:-- 0
[2021-07-20 08:00:10,615] {ssh.py:141} INFO - Report Sent Successfully.
[2021-07-20 08:00:10,616] {transport.py:1819} ERROR - Socket exception: Bad file descriptor (9)
[2021-07-20 08:00:10,633] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 152, in execute
stdout.channel.close()
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/channel.py", line 671, in close
self.transport._send_user_message(m)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1863, in _send_user_message
self._send_message(data)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/transport.py", line 1839, in _send_message
self.packetizer.send_message(data)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 431, in send_message
self.write_all(out)
File "/u01/airflow-venv/lib/python3.8/site-packages/paramiko/packet.py", line 367, in write_all
raise EOFError()
EOFError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1336, in _execute_task
result = task_copy.execute(context=context)
File "/u01/airflow-venv/lib/python3.8/site-packages/airflow/providers/ssh/operators/ssh.py", line 171, in execute
raise AirflowException(f"SSH operator error: {str(e)}")
airflow.exceptions.AirflowException: SSH operator error:
--- edit ---
generate_report = SSHOperator(
task_id = 'generate_report',
ssh_conn_id = 'ssh_123',
command = curl -u user:password "http://localhost:1234/path/to/trigger/report_creation_API?async=false",
)
Upvotes: 1
Views: 1331
Reputation: 20097
This is a race condition in paramiko library. A line above this close we are calling shutdown on read, and at the very beginning of the same method we are calling shutdown on write channel. This means that after the second shutdown the channel should be closed and likely this is what happens under the hood in paramiko library.
However this seems to happen asynchronously in a separate thread. Depending on which thread gets first, the socket can be already closed when we call close()
in the operator. If the async thread in paramiko was faster, we are attempting to close an already closed socket and the error is thrown.
This is a classic race
condition.
Since closing the closed connection is basically no-op operation, we can safely ignore such an exception. This is what I just did in the PR here:
https://github.com/apache/airflow/pull/17528
It will be released in the next wave of providers most likely (Which is likely and of August).
Upvotes: 3