Aviral Srivastava
Aviral Srivastava

Reputation: 4582

How to have a mix of both Celery Executor and Kubernetes Executor in Apache Airflow?

I have multiple dags using Celery Executor but I want one particular dag to run using Kubernetes Executor. I am unable to deduce a good and reliable way to achieve this.

I have an airflow.cfg in which I have declared CeleryExecutor to be used. And I don't want to change it since it is really needed in all the dags but one.

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor

My dag code:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
    KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample_1', default_args=default_args)


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                                image="Python:3.6",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="passing-test",
                                task_id="passing-task",
                                get_logs=True,
                                dag=dag
                                )

failing = KubernetesPodOperator(namespace='default',
                                image="ubuntu:1604",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="fail",
                                task_id="failing-task",
                                get_logs=True,
                                dag=dag
                                )

passing.set_upstream(start)
failing.set_upstream(start)

I can put an if-else condition and then change the value from the point where Airflow picks up the configuration. If this sounds right, please tell me the paths and the files. Although I was hoping to get a more mature method, if it exists.

Upvotes: 7

Views: 5825

Answers (4)

Ayub Abuzer
Ayub Abuzer

Reputation: 1

The CeleryKubernetesExecutor allows users to run simultaneously a CeleryExecutor and a KubernetesExecutor. An executor is chosen to run a task based on the task’s queue.

CeleryKubernetesExecutor inherits the scalability of the CeleryExecutor to handle the high load at the peak time and runtime isolation of the KubernetesExecutor

please see reference here : https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/celery_kubernetes_executor.html

Upvotes: 0

caxefaizan
caxefaizan

Reputation: 321

Starting Airflow 2.x configure airflow.cfg as follows: In [core] section set executor = CeleryKubernetesExecutor and in [celery_kubernetes_executor] section set kubernetes_queue = kubernetes. So whenever you want to run a task instance in the kubernetes executor, add the parameter queue = kubernetes in the task definition. for eg.

task1= BashOperator(
        task_id='Test_kubernetes_executor',
        bash_command='echo Kubernetes',
        queue = 'kubernetes'
    )
task2 = BashOperator(
        task_id='Test_Celery_Executor',
        bash_command='echo Celery',
    )

On running the dag you will see task1 running in k8s and task2 in celery. Hence unless you write the queue as kubernetes, all dag will run on celery executor

Upvotes: 5

Alessandro S.
Alessandro S.

Reputation: 1043

Now there is the CeleryKubernetesExecutor (can't see when it was exactly introduced), which requires to set up Celery and Kubernetes up, but also offers the functionalities from both.

In the official documentation, they offer a rule of thumb to decide when it's worth using it:

We recommend considering the CeleryKubernetesExecutor when your use case meets:

The number of tasks needed to be scheduled at the peak exceeds the scale that your Kubernetes cluster can comfortably handle

A relative small portion of your tasks requires runtime isolation.

You have plenty of small tasks that can be executed on Celery workers but you also have resource-hungry tasks that will be better to run in predefined environments.

Upvotes: 5

Ashwin
Ashwin

Reputation: 1

I don't think it is possible to use both the executors. But you can just use CeleryExecutor, but declare resource intensive tasks with KubernetesPodOperator, and problem solved jobs are scheduled/watched by CeleryExecutor and ran by Kubernetes for actual processing logic in tasks.

Upvotes: 0

Related Questions