SATYAM CHATTERJEE
SATYAM CHATTERJEE

Reputation: 21

Airflow DAG status is Success, but task states Dag has yet to run

I am using Airflow 2.3.4 ; I am Triggering with Config. When I hardcode the config values, this DAG runs successfully. But on Triggering after passing config my tasks never start, but the status turn green(Success). Please help me understand what's going wrong !

from datetime import datetime, timedelta
from airflow import DAG
from pprint import pprint
from airflow.operators.python import PythonOperator
from operators.jvm import JVMOperator

args = {
    'owner': 'satyam',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag_params = {
    'dag_id': 'synthea_etl_end_to_end_with_config',
    'start_date': datetime.utcnow(),
    'end_date': datetime(2025, 2, 5),
    'default_args': args,
    'schedule_interval': timedelta(hours=4)
}


dag = DAG(**dag_params)

# [START howto_operator_python]


def print_context(ds, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    pprint(ds)
    return 'Whatever you return gets printed in the logs'


jvm_task = JVMOperator(
    task_id='jvm_task',
    correlation_id='123456',
    jar='/home/i1136/Synthea/synthea-with-dependencies.jar',
    options={ 
    'java_args': [''],
    'jar_args': ["-p {{ dag_run.conf['population_count'] }} --exporter.fhir.export {{ dag_run.conf['fhir'] }} --exporter.ccda.export {{ dag_run.conf['ccda'] }} --exporter.csv.export {{ dag_run.conf['csv'] }} --exporter.csv.append_mode {{ dag_run.conf['csv'] }} --exporter.baseDirectory /home/i1136/Synthea/output_dag_config" ]
    })

print_context_task = PythonOperator(task_id='print_context_task', provide_context=True, python_callable=print_context, dag=dag)

jvm_task.set_downstream(print_context_task)

Upvotes: 1

Views: 3480

Answers (2)

HankerPL
HankerPL

Reputation: 21

In my case it was a small bug in python script - not detected by Airflow after refreshing

Upvotes: 0

Hussein Awala
Hussein Awala

Reputation: 5110

The problem is with 'start_date': datetime.utcnow(), which is always >= the dag_run start_date, in this case Airflow will mark the run as succeeded without running it. For this variable, it's better to choose the minimum date of your runs, if you don't have one, you can use the yesterday date, but the next day you will not be able to re-run the tasks failed on the previous day:

import pendulum
dag_params = {
    ...,
    'start_date': pendulum.yesterday(),
    ...,
}

Upvotes: 7

Related Questions