Reputation: 53
I am trying to make an 'insert one' collection to mongo DAG in Apache Airflow. I have set schedule_interval="@daily", but the DAG is parsed every 30 seconds (not triggered, but parsed). The insert is executed every 30 seconds instead daily. During the search I have found that on webserver docker container you can amend line 'min_file_process_interval' from 'airflow.cfg' file, but it was not successful.
The desired result would be that code would me parsed every 30 seconds and triggered only daily.
There are similiar SO topics, but without answer: Airflow DAG auto-refreshes every 30 seconds
Minimal reproducible example:
default_args = {
'owner': 'kw',
'retries': 1,
'retry_delay': timedelta(minutes=15),
}
doc_raw = {
'name': "Chaitanya"
}
MONGO_CONN_ID = 'mongo_conn'
MONGO_COLLECTION = 'airflowtest'
def print_hello():
return 'Hello world from first Airflow DAG!'
with DAG(
dag_id='kw_mongo_test4',
default_args=default_args,
start_date=datetime(2022, 6, 27),
schedule_interval="@daily",
description='use case of mongo operator in airflow',
catchup=False) as dag:
task1 = PythonOperator(task_id='hello_task', python_callable=print_hello)
task2 = MongoHook(
task_id="mongo_insert_test",
mongo_conn_id=MONGO_CONN_ID,
mongo_collection=MONGO_COLLECTION,
mongo_db=MONGO_COLLECTION,
default_conn_name = 'mongo_default',
conn_id='mongo_conn',
conn_type = 'mongo_conn',
).insert_one(
mongo_collection = 'airflowtest',
doc=doc_raw,
)
Upvotes: 0
Views: 1320
Reputation: 149
A hook is different from an operator. You are using a hook, which does not register as a task in the DAG. I don't see a MongoOperator available so you need to run your MongoHook command inside a PythonOperator.
default_args = {
'owner': 'kw',
'retries': 1,
'retry_delay': timedelta(minutes=15),
}
doc_raw = {
'name': "Chaitanya"
}
MONGO_CONN_ID = 'mongo_conn'
MONGO_COLLECTION = 'airflowtest'
def print_hello():
return 'Hello world from first Airflow DAG!'
def insert_to_mongo():
hook = MongoHook(MONGO_CONN_ID)
hook.insert_one(mongo_collection=MONGO_COLLECTION, doc=doc_raw)
with DAG(
dag_id='kw_mongo_test4',
default_args=default_args,
start_date=datetime(2022, 6, 27),
schedule_interval="@daily",
description='use case of mongo operator in airflow',
catchup=False) as dag:
task1 = PythonOperator(task_id='hello_task', python_callable=print_hello)
task2 = PythonOperator(task_id='mongo_insert_test', python_callable=insert_to_mongo)
Upvotes: 2