Soundar Raj
Soundar Raj

Reputation: 143

Airflow Worker Configuration

I am a newbie to Airflow. I'm trying to setup Distributed Mode of Airflow Using Celery Executor by Refering this article https://stlong0521.github.io/20161023%20-%20Airflow.html

Before getting into detail about the specification I would like to confirm that I've installed PostgreSQL on a seperate instance.

The specification of the setup is detailed below:

Airflow core/server computer

Configurations made in airflow.cfg:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow

Tests performed:

RabbitMQ is running
Can connect to PostgreSQL and have confirmed that Airflow has created tables
Can start and view the webserver (including custom dags)

Airflow worker computer

Has the following installed:

Configurations made in airflow.cfg are exactly the same as in the server:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:[email protected]:5672//
celery_result_backend = db+postgresql://username:[email protected]:5432/airflow

Output from commands run on the worker machine:

When running airflow flower:

[2018-02-19 14:58:14,276] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-02-19 14:58:14,360] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt
[2018-02-19 14:58:14,384] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt
[I 180219 14:58:15 command:139] Visit me at http://0.0.0.0:5555
[I 180219 14:58:15 command:144] Broker: amqp://username:[email protected]:5672//
[I 180219 14:58:15 command:147] Registered tasks: 
    ['celery.accumulate',
     'celery.backend_cleanup',
     'celery.chain',
     'celery.chord',
     'celery.chord_unlock',
     'celery.chunks',
     'celery.group',
     'celery.map',
     'celery.starmap']
[I 180219 14:58:15 mixins:224] Connected to amqp://username:[email protected]:5672//

I am passing the dag in the Airflow Core machine and also I have copied the sample data(Excel sheets) which the dag will process to the same core machine.

My worker log raise CalledProcessError(retcode, cmd) subprocess.CalledProcessError: Command 'airflow run dag_name_x task_name_xx 2018-02-19T10:15:41.657243 --local -sd /home/Distributedici/airflow/dags/sample_data_xx.py' returned non-zero exit status 1

Now my query is

1) Should I copy the dag folder to the worker computer also

2) Right now, I have not copied the dag folder on the worker computer and I'm not able to see the worker process pick up the task.

Please point me where I am making a mistake and how to make the worker process pick up the process.

Upvotes: 13

Views: 17459

Answers (4)

Bharath Palaksha
Bharath Palaksha

Reputation: 39

Yes, dags must be present on all airflow nodes - worker, webserver, scheduler.

You can have a cron that runs git pull in your dags folder on all nodes to keep it in sync.

Airflow will move all dags to database instead of file system - this feature might come in 2.0.

Upvotes: 1

Alessandro S.
Alessandro S.

Reputation: 1033

A little late on this, but it might still help someone, as from the existing answers it looks like there is no way to share DAGs other then "manual" deployment (via git/scp etc.), while there is a way.

Airflow supports pickling (-p parameter from the CLI or command: scheduler -p in your docker-compose file), which allows to deploy the DAGs on the server/master, and have them serialized and sent to the workers (so you don't have to deploy DAGs in multiple places and you avoid issues with out-of-sync DAGs).

Pickling is compatible with CeleryExecutor.

Pickling has some limitations that can bite you back, notably the actual code of classes and functions is not serialized (only the fully qualified name is), so there will be an error if you try to deserialize a DAG referring to code you don't have in the target environment. For more info on pickle you can have a look here: https://docs.python.org/3.3/library/pickle.html

Upvotes: 1

Taylor D. Edmiston
Taylor D. Edmiston

Reputation: 13016

Some of the biggest pain points with Airflow come up around deployment and keeping DAG files and plugins in sync across your Airflow scheduler, Airflow webserver, and Celery worker nodes.

We've created an open source project called Astronomer Open that automates a Dockerized Airflow, Celery, and PostgreSQL with some other goodies baked in. The project was motivated by seeing so many people hit the same pain points creating a very similar setup.

For example, here's the Airflow Dockerfile: https://github.com/astronomer/astronomer/blob/master/docker/airflow/1.10.2/Dockerfile

And the docs: https://open.astronomer.io/

Full disclosure: This is a project I contribute to at work — we offer a paid enterprise edition as well that runs on Kubernetes (docs). That said, the Open Edition is totally free to use.

Upvotes: 6

Daniel Huang
Daniel Huang

Reputation: 6548

Your configuration files look okay. As you suspected, all workers do indeed require a copy of the DAG folder. You can use something like git to keep them in sync and up to date.

Upvotes: 4

Related Questions