Reputation: 4582
I have multiple dags using Celery Executor but I want one particular dag to run using Kubernetes Executor. I am unable to deduce a good and reliable way to achieve this.
I have an airflow.cfg
in which I have declared CeleryExecutor
to be used. And I don't want to change it since it is really needed in all the dags but one.
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor
My dag code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import \
KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample_1', default_args=default_args)
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="Python:3.6",
cmds=["Python", "-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
failing = KubernetesPodOperator(namespace='default',
image="ubuntu:1604",
cmds=["Python", "-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
failing.set_upstream(start)
I can put an if-else condition and then change the value from the point where Airflow picks up the configuration. If this sounds right, please tell me the paths and the files. Although I was hoping to get a more mature method, if it exists.
Upvotes: 7
Views: 5825
Reputation: 1
The CeleryKubernetesExecutor
allows users to run simultaneously a CeleryExecutor
and a KubernetesExecutor
. An executor is chosen to run a task based on the task’s queue.
CeleryKubernetesExecutor
inherits the scalability of the CeleryExecutor
to handle the high load at the peak time and runtime isolation of the KubernetesExecutor
please see reference here : https://airflow.apache.org/docs/apache-airflow-providers-celery/stable/celery_kubernetes_executor.html
Upvotes: 0
Reputation: 321
Starting Airflow 2.x configure airflow.cfg
as follows:
In [core]
section set executor = CeleryKubernetesExecutor
and in [celery_kubernetes_executor]
section set kubernetes_queue = kubernetes
. So whenever you want to run a task instance in the kubernetes executor, add the parameter queue = kubernetes
in the task definition. for eg.
task1= BashOperator(
task_id='Test_kubernetes_executor',
bash_command='echo Kubernetes',
queue = 'kubernetes'
)
task2 = BashOperator(
task_id='Test_Celery_Executor',
bash_command='echo Celery',
)
On running the dag you will see task1 running in k8s and task2 in celery. Hence unless you write the queue as kubernetes, all dag will run on celery executor
Upvotes: 5
Reputation: 1043
Now there is the CeleryKubernetesExecutor (can't see when it was exactly introduced), which requires to set up Celery and Kubernetes up, but also offers the functionalities from both.
In the official documentation, they offer a rule of thumb to decide when it's worth using it:
We recommend considering the CeleryKubernetesExecutor when your use case meets:
The number of tasks needed to be scheduled at the peak exceeds the scale that your Kubernetes cluster can comfortably handle
A relative small portion of your tasks requires runtime isolation.
You have plenty of small tasks that can be executed on Celery workers but you also have resource-hungry tasks that will be better to run in predefined environments.
Upvotes: 5
Reputation: 1
I don't think it is possible to use both the executors. But you can just use CeleryExecutor, but declare resource intensive tasks with KubernetesPodOperator, and problem solved jobs are scheduled/watched by CeleryExecutor and ran by Kubernetes for actual processing logic in tasks.
Upvotes: 0