Reputation: 473
I'm a beginner in Apache Airflow. I have a requirement to scheduling tasks based on the JSON payload. Simply, that JSON payload contains the date and time and the required data to schedule the task. Can I accommodate this requirement using Apache Airflow? the simple architecture diagram that I want to implement is as follows.
As in the above diagram, I want to trigger a DAG with a JSON payload and schedule that data on a RabbitMQ broker according to the date and time in the payload. When the date and time condition met, the broker should pop that data from the queue and assign it to a worker node to execute (as distributed workers). So, inside the worker node, the Python operator can be used to invoke the backend.
Am I following the current path here? As per my understanding, RabbitMQ and worker node part is handled automatically by the CeleryExecutor. Can I schedule the tasks dynamically using another DAG in Airflow? Here I used the single file method to create dynamic DAGS on airflow.
EDIT
To accommodate the above requirement, I wrote a custom DAG as follow. But once I invoke it, it does not create the DAG called my_scheduler. Any idea?
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def _schedule_task(*args):
print('Task scheduling....')
def _create_dag(dag_id, schedule, default_args):
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id='scheduler', python_callable=_schedule_task)
return dag
def _training_model(**kwargs):
print(kwargs['dag_run'].conf)
print("Triggered !!")
dag_id = 'my_scheduler'
default_args = {
'owner': 'airflow',
'start_date': datetime(2021, 1, 1),
'catchup': False
}
schedule = '@once'
globals()[dag_id] = _create_dag(dag_id, schedule, default_args)
print("Done !!")
with DAG("schedule_dag", start_date=datetime(2021, 1, 1),
schedule_interval="@once", catchup=False) as dag:
training_model = PythonOperator(
task_id="main_model",
python_callable=_training_model
)
training_model
Upvotes: 2
Views: 6023
Reputation: 808
Yes, the scheduling and execution part is handled by Airflow using CeleryExecutor (note: some suggest using Redis instead of RabbitMQ).
To pass JSON payload to your DAG you can utilise DagRun conf. Then your tasks within the DAG should reference the conf
value either using templated fields ({{ dag_run.conf }}
) or if you plan to use PythonOperator
you can access the object directly:
def python_callable(dag_run: DagRun):
xyz = dag_run.conf["xyz"] # this would be your JSON payload
...
When the date and time condition met
Here I understand that you want to
If that's true, then you can use sensor that will pause the execution until some conditions are met. In other words I would built the DAG as check_conditions_sensor >> execute_work
.
Based on discussion in comment it seems that the author of the question wants to create a new DAG and schedule it using the values(date, time and interval) from the JSON payload.
To do that a DAG has to be created and that is not supported by Airflow REST API. DAG is a python file and it has to be uploaded to Airflow instance.
While generating a DAG is an easy part (for example by using jinja templates and generating python file), distribution of the DAG will be more tricky because it will depend on type of deployment and it also imposes some security risks.
I would done that by creating a DAGFactory
DAG that would be called via Airflow REST API and would have build new DAGs using dag_run.conf
passed in request. However, the resulting file generated by this DAG should uploaded to a place from where all other DAGs are distributed (git repo, S3 bucket, etc). Example below using TaskFlow.
@task
def get_template() -> str:
# Here we get the template from remote so
# we can easily update it and version it
return template
@task
def build_dag(template: str, dag_run: DagRun = None) -> str:
dag_content = template.format(dag_run.conf)
return dag_content
@task
def upload_dag(dag_content: str):
# upload to gir repo, S3 etc so the DAg will be available in Airflow
with DAG("DAGFactory", ...) as dag:
template = get_template()
dag_content = build_dag(template)
upload_dag(dag_content)
Upvotes: 2