Reputation: 13
I am new to Airflow and I am trying to use airflow to build a data pipeline, but it keeps getting some exceptions. My airflow.cfg look like this:
executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
sql_alchemy_pool_size = 5
parallelism = 96
dag_concurrency = 96
worker_concurrency = 96
max_threads = 96
broker_url = postgresql+psycopg2://airflow:airflow@localhost/airflow
result_backend = postgresql+psycopg2://airflow:airflow@localhost/airflow
When I started up airflow webserver -p 8080
in one terminal and then airflow scheduler
in another terminal, the scheduler run will have the following execption(It failed when I set the parallelism number greater some amount, it works fine otherwise, this may be computer-specific but at least we know that it is resulted by the parallelism). I have tried run 1000 python processes on my computer and it worked fine, I have configured Postgres to allow maximum 500 database connections but it is still giving me the errors.
[2019-11-20 12:15:00,820] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 85050
Process QueuedLocalWorker-18:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 811, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/edward/.local/share/virtualenvs/avat-utils-JpGzQGRW/lib/python3.7/site-packages/airflow/executors/local_executor.py", line 111, in run
key, command = self.task_queue.get()
File "<string>", line 2, in get
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 815, in _callmethod
self._connect()
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 802, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused
Thanks
Updated: I tried run in Pycharm, and it worked fine in Pycharm but sometimes failed in the terminal and sometimes it's not
Upvotes: 1
Views: 1625
Reputation: 105
I had the same issue. Turns out I had set max_threads=10 in airflow.cfg in combination with LocalExecutor. Switching max_threads=2 solved the issue.
Upvotes: 1
Reputation: 13
Found out few days ago, Airflow actually starts up all the parallel process when starting up, I was thinking max_sth and parallelism as the capacity but it is the number of processes it will run when start up. So it looks like this issue is caused by the insufficient resources of the computer.
Upvotes: 0