Sajitha Liyanage
Sajitha Liyanage

Reputation: 473

Airflow distributed message flows with RabbitMQ message broker

I'm a beginner in Apache Airflow. I have a requirement to scheduling tasks based on the JSON payload. Simply, that JSON payload contains the date and time and the required data to schedule the task. Can I accommodate this requirement using Apache Airflow? the simple architecture diagram that I want to implement is as follows.

enter image description here

As in the above diagram, I want to trigger a DAG with a JSON payload and schedule that data on a RabbitMQ broker according to the date and time in the payload. When the date and time condition met, the broker should pop that data from the queue and assign it to a worker node to execute (as distributed workers). So, inside the worker node, the Python operator can be used to invoke the backend.

Am I following the current path here? As per my understanding, RabbitMQ and worker node part is handled automatically by the CeleryExecutor. Can I schedule the tasks dynamically using another DAG in Airflow? Here I used the single file method to create dynamic DAGS on airflow.

EDIT

To accommodate the above requirement, I wrote a custom DAG as follow. But once I invoke it, it does not create the DAG called my_scheduler. Any idea?

from airflow import DAG
from airflow.operators.python import PythonOperator

from datetime import datetime

def _schedule_task(*args):
    print('Task scheduling....')
        
def _create_dag(dag_id, schedule, default_args):
    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
    with dag:
        t1 = PythonOperator(task_id='scheduler', python_callable=_schedule_task)
    return dag

def _training_model(**kwargs):
    print(kwargs['dag_run'].conf)
    print("Triggered !!")
    dag_id = 'my_scheduler'
    default_args = {
                    'owner': 'airflow',
                    'start_date': datetime(2021, 1, 1),
                    'catchup': False
                    }
    schedule = '@once'
    globals()[dag_id] = _create_dag(dag_id, schedule, default_args)
    print("Done !!")

with DAG("schedule_dag", start_date=datetime(2021, 1, 1),
    schedule_interval="@once", catchup=False) as dag:

    training_model = PythonOperator(
        task_id="main_model",
        python_callable=_training_model
    )

    training_model

Upvotes: 2

Views: 6023

Answers (1)

Tomasz Urbaszek
Tomasz Urbaszek

Reputation: 808

Yes, the scheduling and execution part is handled by Airflow using CeleryExecutor (note: some suggest using Redis instead of RabbitMQ).

To pass JSON payload to your DAG you can utilise DagRun conf. Then your tasks within the DAG should reference the conf value either using templated fields ({{ dag_run.conf }}) or if you plan to use PythonOperator you can access the object directly:

def python_callable(dag_run: DagRun):
  xyz = dag_run.conf["xyz"]  # this would be your JSON payload
  ...

When the date and time condition met

Here I understand that you want to

  1. Trigger the DAG via API with JSON payload
  2. The DAG should not execute the PythonOperator logic instantly but wait for other conditions.

If that's true, then you can use sensor that will pause the execution until some conditions are met. In other words I would built the DAG as check_conditions_sensor >> execute_work.

EDIT

Based on discussion in comment it seems that the author of the question wants to create a new DAG and schedule it using the values(date, time and interval) from the JSON payload.

To do that a DAG has to be created and that is not supported by Airflow REST API. DAG is a python file and it has to be uploaded to Airflow instance.

While generating a DAG is an easy part (for example by using jinja templates and generating python file), distribution of the DAG will be more tricky because it will depend on type of deployment and it also imposes some security risks.

I would done that by creating a DAGFactory DAG that would be called via Airflow REST API and would have build new DAGs using dag_run.conf passed in request. However, the resulting file generated by this DAG should uploaded to a place from where all other DAGs are distributed (git repo, S3 bucket, etc). Example below using TaskFlow.

@task
def get_template() -> str:
  # Here we get the template from remote so 
  # we can easily update it and version it
  return template

@task
def build_dag(template: str, dag_run: DagRun = None) -> str:
  dag_content = template.format(dag_run.conf)
  return dag_content

@task
def upload_dag(dag_content: str):
  # upload to gir repo, S3 etc so the DAg will be available in Airflow
  

with DAG("DAGFactory", ...) as dag:
  template = get_template()  
  dag_content = build_dag(template)
  upload_dag(dag_content)

Upvotes: 2

Related Questions