Kay
Kay

Reputation: 19650

How to trigger a dag to run immediately

I am attempting to build an ETL pipeline. The first step i am fetching some data from an api. I would like for this dag to run immediately when it's called.

python dag.py

Also, after calling this script i expected to see this dag in the webserver dashboard but i don't see it.

dag.py

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from pipeline import Pipeline
import asyncio

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('data', default_args=default_args, schedule_interval=timedelta(days=1))

def fetch_user_ids(twitter_handle_name):
    pipeline = Pipeline()
    twitter_pipeline = pipeline.twitter_pipeline(twitter_handle_name)
    asyncio.run(twitter_pipeline.fetch_user_ids())

run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=fetch_user_ids,
    op_kwargs={'twitter_handle_name': "MENnewsdesk"},
    dag=dag,
)

Right now i have only defined one task but in the future the dag will have multiple tasks.

Upvotes: 1

Views: 4214

Answers (1)

ganesh_patil
ganesh_patil

Reputation: 366

The python dag.py command only verify the code it is not going to run the dag. If you want to run the dag in webserver you need to place dag.py file in 'dag' directory. Airflow automatically reads the file from dag directory and loads the dag in web server and start run it according to the start_date you defined in defaults_args. As your start_date is (2015, 6, 1) and your scheduling interval is 1 day, in this case airflow will create one task for each day till current date. So I think you need to change the start_date.

If you want to trigger this dag manually then you need to set scheduling_interval=None and use airflow trigger_dag dag_id (Documentation : airflow trigger dag)

Thanks.

Upvotes: 3

Related Questions