Alexis.Rolland
Alexis.Rolland

Reputation: 6363

Change execution concurrency of Airflow DAG

I would like to change the dag_concurrency parameter of a specific Airflow DAG. It seems there is a global dag_concurrency parameter in airflow.cfg but is it possible to set different values for different DAGs?

I have tried to add a concurrency parameter in my DAG code in the SSHExecuteOperator task but the concurrency value still shows the standard parameter (16) in the DAG details.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['[email protected]'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')

with DAG(
  'ed_data_quality_20min-v1.6.6',
  default_args=default_args,
  schedule_interval="0,20,40 * * * *",
  dagrun_timeout=timedelta(hours=24)) as dag:
  (
    dag
    >> SSHExecuteOperator(
          task_id='run_remote_ed_data_quality_20min',
          bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
          ssh_hook=sshHookEtl,
          retries=0,
          concurrency=1,
          dag=dag)
  )

Here is the DAG details

Upvotes: 6

Views: 9451

Answers (2)

Alexis.Rolland
Alexis.Rolland

Reputation: 6363

I found the solution. I was not adding the concurrency parameter in the right place. It should be added as an attribute of the DAG object directly and not in the task SSHExecuteOperator. Here is the new code:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['[email protected]'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment
sshHookEtl = SSHHook(conn_id='SSH__airflow@myserver')

with DAG(
  'ed_data_quality_20min-v1.6.6',
  default_args=default_args,
  schedule_interval="0,20,40 * * * *",
  dagrun_timeout=timedelta(hours=24),
  concurrency=1) as dag:
  (
    dag
    >> SSHExecuteOperator(
          task_id='run_remote_ed_data_quality_20min',
          bash_command='bash /opt/scripts/shell/EXEC_ED_DATA_QUALITY_20MIN.sh ',
          ssh_hook=sshHookEtl,
          retries=0,
          dag=dag)
  )

Upvotes: 5

dlamblin
dlamblin

Reputation: 45361

Okay… You can just set the concurrency on the DAG object. There's also a task_concurrency on the BaseOperator object. There is no concurrency param nor field on the SSHExectorOperator or BaseOperator task.

Upvotes: 1

Related Questions