Jonny5
Jonny5

Reputation: 1499

Best practices for multiple clients in apache Apache Airflow/Cloud Composer?

Problem:

Are there any best practices in Airflow to keep things simple? I'm thinking about (in no specific order):

I cannot find a lot of material on this particular use case.

Ideally we have one "template" that is re-used per client. It is unclear whether one job or multiple jobs are the best solution. Or maybe there is another way that better suits this usage?

Upvotes: 3

Views: 1595

Answers (1)

aga
aga

Reputation: 3893

Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and Operators are in the contrib section, which means that they have a beta status, meaning that they can have breaking changes between minor releases.

Number of client aspects:

There can be as many DAGs as is needed and each one of them can mention multiple tasks. It is recommended to keep one logical workflow in one DAG file and try keep it very light (e.g. configuration file). It allows taking less time and resources for the Airflow scheduler to process them at each heartbeat.

It is possible to create DAGs (with the same base code) dynamically based on any number of configuration parameters, which is really helpful and time-saving option when having a lot of clients.

To create new DAGs, please create a DAG template within the create_dag function. Code can be wrapped in a method that allows for custom parameters to be passed in. Moreover, the input parameters don't have to exist in the dag file itself. Another common form of generating DAGs is by setting values in a Variable object. Plese, refer here for further information.

Specific client configs:

You can use Macros are used to pass dynamic information into task instances at runtime. A list of default variables accessible in all templates can be found here.

Airflow’s built-in support for Jinja templating enables users to pass arguments that can be used in templated fields.

UI overview

If your dag takes long time to load, you could reduce the value of default_dag_run_display_number configuration in airflow.cfg to a smaller value. This configurable controls the number of dag run to show in UI with default value 25.

Modularity

If a dictionary of default_args is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.

Take a look for example:

from datetime import datetime, timedelta

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow

For more information about the BaseOperator’s parameters and what they do, refer to the airflow.models.BaseOperator documentation.

Performance

It is possible to use variables, which you could control to improve airflow DAG performance (possible to set in the airflow.cfg.):

  • parallelism: controls the number of task instances that runs simultaneously across the whole Airflow cluster.
  • concurrency: The Airflow scheduler will run no more than concurrency task instances for your DAG at any given time. Concurrency is defined in your Airflow DAG. If you do not set the concurrency on your DAG, the scheduler will use the default value from the dag_concurrency entry in your airflow.cfg.
  • task_concurrency: This variable controls the number of concurrent running task instances across dag_runs per task.
  • max_active_runs: the Airflow scheduler will run no more than max_active_runs DagRuns of your DAG at a given time.
  • pool: This variable controls the number of concurrent running task instances assigned to the pool.

You can see the airflow config in the composer instance bucket gs://composer_instance_bucket/airflow.cfg. You can tune this configuration as you wish, but keep in mind that cloud composer has some configurations blocked.

Scaling

Please, keep in mind that is recommended that the number of nodes must be greater than 3, keeping this number below to 3 could cause some issues, if you want to up upgrade the number of nodes you can use the gcloud command to specify this value. Also please note that, there are some airflow configurations related to autoscalling blocked and can't be overridden. Some Airflow configurations are preconfigured for Cloud Composer, and you cannot change them.

Fault-tolerance

Please, refer to following documentation.

Re-execution

Just like object is an instance of a class, an Airflow task is an instance of an Operator (BaseOperator). So write a "re-usable" operator and use it hundreds of times across your pipelines simply by passing different params.

Latency

It is possible to reduce airflow DAG scheduling latency in production by using:

  • max_threads: Scheduler will spawn multiple threads in parallel to schedule dags. This is controlled by max_threads with default value of 2.
  • scheduler_heartbeat_sec: User should consider to increase scheduler_heartbeat_sec config to a higher value(e.g 60 secs) which controls how frequent the airflow scheduler gets the heartbeat and updates the job’s entry in database.

Please refer to following articles about best practices:

I hope it will helps you in some way.

Upvotes: 4

Related Questions