Edward
Edward

Reputation: 13

Airflow scheduler starts up with exception when parallelism is set to a large number

I am new to Airflow and I am trying to use airflow to build a data pipeline, but it keeps getting some exceptions. My airflow.cfg look like this:

executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
sql_alchemy_pool_size = 5
parallelism = 96
dag_concurrency = 96
worker_concurrency = 96
max_threads = 96
broker_url = postgresql+psycopg2://airflow:airflow@localhost/airflow
result_backend = postgresql+psycopg2://airflow:airflow@localhost/airflow

When I started up airflow webserver -p 8080 in one terminal and then airflow scheduler in another terminal, the scheduler run will have the following execption(It failed when I set the parallelism number greater some amount, it works fine otherwise, this may be computer-specific but at least we know that it is resulted by the parallelism). I have tried run 1000 python processes on my computer and it worked fine, I have configured Postgres to allow maximum 500 database connections but it is still giving me the errors.

[2019-11-20 12:15:00,820] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 85050
Process QueuedLocalWorker-18:
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 811, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/edward/.local/share/virtualenvs/avat-utils-JpGzQGRW/lib/python3.7/site-packages/airflow/executors/local_executor.py", line 111, in run
    key, command = self.task_queue.get()
  File "<string>", line 2, in get
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 815, in _callmethod
    self._connect()
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 802, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

Thanks

Updated: I tried run in Pycharm, and it worked fine in Pycharm but sometimes failed in the terminal and sometimes it's not

Upvotes: 1

Views: 1625

Answers (2)

the_stack_over_flew
the_stack_over_flew

Reputation: 105

I had the same issue. Turns out I had set max_threads=10 in airflow.cfg in combination with LocalExecutor. Switching max_threads=2 solved the issue.

Upvotes: 1

Edward
Edward

Reputation: 13

Found out few days ago, Airflow actually starts up all the parallel process when starting up, I was thinking max_sth and parallelism as the capacity but it is the number of processes it will run when start up. So it looks like this issue is caused by the insufficient resources of the computer.

Upvotes: 0

Related Questions