Reputation: 169
We have moved to AirFlow 1.10.2 to resolve the CPU usage, Good thing is that the issue we had, got fixed in our environment. However, we have observed that the DAG's tasks though are getting submitted and shows running on the AirFlow dashboard, but they kind of hold up with actual processing and then appears to remain in the queue for about 60 seconds after that the actual execution happens. Please note that for our use case implementation
executor = LocalExecutor
] .The python code watches a directory for any file(s) that arrive. For any file, it observes, the code triggers the AirFlow DAG. We get bundles of files arriving and so at any given instance, we have scenarios where multiple instance of the same DAGs are getting invoked [ code snippet provided below ]. The DAGs are triggered which in turn has a task that calls a python code to trigger a Kubernetes pod where some file related processing happens. Please find below an excerpt from the DAG code
positional_to_ascii = BashOperator(
task_id="uncompress_the_file",
bash_command='python3.6 ' + os.path.join(cons.CODE_REPO, 'app/Code/k8Job/create_kubernetes_job.py') + ' POS-PREPROCESSING {{ dag_run.conf["inputfilepath"] }} {{ dag_run.conf["frt_id"]}}',
execution_timeout=None,
dag=dag)
Once this task completes it triggers another DAGs are triggered which has task that processes data from the output of the previous DAG.
Please find below a few details of our config file parameters which may assist in assessing the root cause.
min_file_process_interval = 60
dag_dir_list_interval = 300
max_threads = 2
dag_concurrency = 16
worker_concurrency = 16
max_active_runs_per_dag = 16
parallelism = 32
sql_alchemy_conn = mysql://airflow:fewfw324$gG@someXserver:3306/airflow
executor = LocalExecutor
The DagBag parsing time: 1.305286. Please as well find below the snapshot of the command airflow list_dags -r
below
-------------------------------------------------------------------
DagBag loading stats for /root/airflow/dags
-------------------------------------------------------------------
Number of DAGs: 7
Total task number: 23
DagBag parsing time: 1.305286
------------------------------+----------+---------+----------+------------------------------
file | duration | dag_num | task_num | dags
------------------------------+----------+---------+----------+------------------------------
/trigger_cleansing.py | 0.876388 | 1 | 5 | ['trigger_cleansing']
/processing_ebcdic_trigger.py | 0.383038 | 1 | 1 | ['processing_ebcdic_trigger']
/prep_preprocess_dag.py | 0.015474 | 1 | 6 | ['prep_preprocess_dag']
/prep_scale_dag.py | 0.012098 | 1 | 6 | ['dataprep_scale_dag']
/mvp.py | 0.010832 | 1 | 2 | ['dg_a']
/prep_uncompress_dag.py | 0.004142 | 1 | 2 | ['dataprep_unzip_decrypt_dag']
/prep_positional_trigger.py | 0.003314 | 1 | 1 | ['prep_positional_trigger']
------------------------------+----------+---------+----------+------------------------------
Below is the status of the airflow-scheduler service which is showing multiple processes
systemctl status airflow-scheduler
● airflow-scheduler.service - Airflow scheduler daemon
Loaded: loaded (/etc/systemd/system/airflow-scheduler.service; enabled; vendor preset: disabled)
Active: active (running) since Sat 2019-03-09 04:44:29 EST; 33min ago
Main PID: 37409 (airflow)
CGroup: /system.slice/airflow-scheduler.service
├─37409 /usr/bin/python3.6 /bin/airflow scheduler
├─37684 /usr/bin/python3.6 /bin/airflow scheduler
├─37685 /usr/bin/python3.6 /bin/airflow scheduler
├─37686 /usr/bin/python3.6 /bin/airflow scheduler
├─37687 /usr/bin/python3.6 /bin/airflow scheduler
├─37688 /usr/bin/python3.6 /bin/airflow scheduler
├─37689 /usr/bin/python3.6 /bin/airflow scheduler
├─37690 /usr/bin/python3.6 /bin/airflow scheduler
├─37691 /usr/bin/python3.6 /bin/airflow scheduler
├─37692 /usr/bin/python3.6 /bin/airflow scheduler
├─37693 /usr/bin/python3.6 /bin/airflow scheduler
├─37694 /usr/bin/python3.6 /bin/airflow scheduler
├─37695 /usr/bin/python3.6 /bin/airflow scheduler
├─37696 /usr/bin/python3.6 /bin/airflow scheduler
├─37697 /usr/bin/python3.6 /bin/airflow scheduler
├─37699 /usr/bin/python3.6 /bin/airflow scheduler
├─37700 /usr/bin/python3.6 /bin/airflow scheduler
├─37701 /usr/bin/python3.6 /bin/airflow scheduler
├─37702 /usr/bin/python3.6 /bin/airflow scheduler
├─37703 /usr/bin/python3.6 /bin/airflow scheduler
├─37704 /usr/bin/python3.6 /bin/airflow scheduler
├─37705 /usr/bin/python3.6 /bin/airflow scheduler
├─37706 /usr/bin/python3.6 /bin/airflow scheduler
├─37707 /usr/bin/python3.6 /bin/airflow scheduler
├─37708 /usr/bin/python3.6 /bin/airflow scheduler
├─37709 /usr/bin/python3.6 /bin/airflow scheduler
├─37710 /usr/bin/python3.6 /bin/airflow scheduler
├─37712 /usr/bin/python3.6 /bin/airflow scheduler
├─37713 /usr/bin/python3.6 /bin/airflow scheduler
├─37714 /usr/bin/python3.6 /bin/airflow scheduler
├─37715 /usr/bin/python3.6 /bin/airflow scheduler
├─37717 /usr/bin/python3.6 /bin/airflow scheduler
├─37718 /usr/bin/python3.6 /bin/airflow scheduler
└─37722 /usr/bin/python3.6 /bin/airflow scheduler
Now the fact that we have several files comming in the DAGs are constantly being fired and have ample DAG task that gets into a waiting stage. Strangely we though didnt have this issue when we were using v1.9 please advise.
Upvotes: 3
Views: 2868
Reputation: 169
I realized that in the 'airflow.cfg' file , the value of the 'min_file_process_interval' was 60. Setting that to zero resolved the problem I reported here.
Upvotes: 1