Javier Lopez Tomas
Javier Lopez Tomas

Reputation: 2362

Airflow DAG successfully executed but tasks didn't run

I have a DAG in airflow with one task (a python operator) which I force to run in the GUI and it gets the successful state. However, the task is not executed and hence the DAG makes nothing. The code of the dag is the following:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks import MySqlHook
import pandas as pd
import datetime as dt
import json
from datetime import timedelta

default_args = {
        'owner': 'airflow',
        'start_date': dt.datetime(2019,8,29,18,0,0),
        'concurrency':1,
        'retries':3
        }

def extraction_from_raw_data(conn_id):
    mysqlserver = MySqlHook(conn_id)
    query = """select * from antifraud.email_fraud_risk
            WHERE ts >= DATE_ADD(CURDATE(), INTERVAL -3 DAY)"""
    raw_data = mysqlserver.get_records(query)
    raw_data = pd.DataFrame(raw_data)

    data_as_list = []

    for i in range(len(raw_data)):
        dict1 = {}
        dict1.update(json.loads(raw_data.at[i,'raw_content']))
        data_as_list.append(dict1)

    json_data_df = pd.DataFrame(data_as_list)

    final_data = pd.concat([raw_data['email_id'],json_data_df],axis=1)

    return final_data

with DAG('emailage_data',
         default_args=default_args,
         schedule_interval = timedelta(days=1)
         ) as dag:
    extraction_from_raw_data = PythonOperator(
    task_id = 'extraction_from_raw_data',
    op_args = {'conn_id':'services'},
    python_callable = extraction_from_raw_data)

extraction_from_raw_data

All worker, scheduler and webserver are working correctly because I am running a hello_world DAG (and its consequent tasks) with success.

Upvotes: 8

Views: 6405

Answers (1)

absolutelydevastated
absolutelydevastated

Reputation: 1757

In general, you should always leave an extra interval between your DAG's start time and the current time.

The Airflow documentation states that

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

https://airflow.apache.org/scheduler.html

Upvotes: 7

Related Questions