Reputation: 1035
I do not know if i am lack of airflow scheduler knowledge or if this is a potential bug from airflow.
situation is like this:
"start_date": airflow.utils.dates.days_ago(1),
Version : 1.10.14
. it is run on kubenetes in azureTask Instance Details
Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Task Instance State Task is in the 'up_for_retry' state which is not a valid state for execution. The task must be cleared in order to be run.
Not In Retry Period Task is not ready for retry yet but will be retried automatically. Current date is 2021-05-17T09:06:57.239015+00:00 and task will be retried at 2021-05-17T09:09:50.662150+00:00.
am i missing something to judge if it is a bug or if it is expected?
addition
, below is the DAG definition as requested.
import airflow
from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.models import Variable
dag_args = {
"owner": "our_project_team_name",
"retries": 1,
"email": ["ouremail_address_replaced_by_this_string"],
"email_on_failure": True,
"email_on_retry": True,
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
}
# Implement cluster reuse on Databricks, pick from light, medium, heavy cluster type based on workloads
clusters = Variable.get("our_project_team_namejob_cluster_config", deserialize_json=True)
databricks_connection = "our_company_databricks"
adl_connection = "our_company_wasb"
pipeline_name = "process_our_data_from_boomi"
dag = DAG(dag_id=pipeline_name, default_args=dag_args, schedule_interval="0 3 * * *")
notebook_dir = "/Shared/our_data_name"
lib_path_sub = ""
lib_name_dev_plus_branch = ""
atlas_library = {
"whl": f"dbfs:/python-wheels/atlas{lib_path_sub}/atlas_library-0{lib_name_dev_plus_branch}-py3-none-any.whl"
}
create_our_data_name_source_data_from_boomi_notebook_params = {
"existing_cluster_id": clusters["our_cluster_name"],
"notebook_task": {
"notebook_path": f"{notebook_dir}/create_our_data_name_source_data_from_boomi",
"base_parameters": {"Extraction_date": "{{ ds_nodash }}"},
},
}
create_our_data_name_standardized_table_from_source_xml_notebook_params = {
"existing_cluster_id": clusters["our_cluster_name"],
"notebook_task": {
"notebook_path": f"{notebook_dir}/create_our_data_name_standardized_table_from_source_xml",
"base_parameters": {"Extraction_date": "{{ ds_nodash }}"},
},
}
create_our_data_name_enriched_table_from_standardized_notebook_params = {
"existing_cluster_id": clusters["our_cluster_name"],
"notebook_task": {
"notebook_path": f"{notebook_dir}/create_our_data_name_enriched",
"base_parameters": {"Extraction_date": "{{ ds_nodash }}"},
},
}
layer_1_task = DatabricksSubmitRunOperator(
task_id="Load_our_data_name_to_source",
databricks_conn_id=databricks_connection,
dag=dag,
json=create_our_data_name_source_data_from_boomi_notebook_params,
libraries=[atlas_library],
)
layer_2_task = DatabricksSubmitRunOperator(
task_id="Load_our_data_name_to_standardized",
databricks_conn_id=databricks_connection,
dag=dag,
json=create_our_data_name_standardized_table_from_source_xml_notebook_params,
libraries=[
{"maven": {"coordinates": "com.databricks:spark-xml_2.11:0.5.0"}},
{"pypi": {"package": "inflection"}},
atlas_library,
],
)
layer_3_task = DatabricksSubmitRunOperator(
task_id="Load_our_data_name_to_enriched",
databricks_conn_id=databricks_connection,
dag=dag,
json=create_our_data_name_enriched_table_from_standardized_notebook_params,
libraries=[atlas_library],
)
layer_1_task >> layer_2_task >> layer_3_task
Upvotes: 0
Views: 3472
Reputation: 1035
after getting some help from @AnandVidvat about trying to make retry=0 experiment and some firend help to change operator to either DummyOperator or PythonOperator, i can confirm that the issue is not to do with DatabricksOperator or airflow version 1.10.x. i.e it is not an airflow bug.
so in summary, when a DAG, has meaningful operator, my setup fails in first Execution without any task log, and during retry works OK (the task log hides the fact it had been retried, because the failure had no logs).
In order to reduce the total run time. The workaround/patch, before finding the real cause, is to set the retry_delay
to 10 seconds (default is 5 mins, and it makes DAG run long unnessicssarily.)
Next step is to figure out what is causing this 1st failure thing, by checking logs on scheduler or woker pods in our current setup (azure K8s, postgresql, Redis, celery executor).
p.s. I used below DAG tested and get the conclusion.
import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import time
from pprint import pprint
dag_args = {
"owner": "min_test",
"retries": 1,
"email": ["[email protected]"],
"email_on_failure": True,
"email_on_retry": True,
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
}
pipeline_name = "min_test_debug_airflow_baseline_PythonOperator_1_retry"
dag = DAG(
dag_id=pipeline_name,
default_args=dag_args,
schedule_interval="0 3 * * *",
tags=["min_test_airflow"],
)
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"
run_this = PythonOperator(
task_id="print_the_context",
provide_context=True,
python_callable=print_context,
dag=dag,
)
# Generate 3 sleeping tasks, sleeping from 0 to 2 seconds respectively
for i in range(3):
task = PythonOperator(
task_id="sleep_for_" + str(i),
python_callable=my_sleeping_function,
op_kwargs={"random_base": float(i) / 10},
dag=dag,
)
task.set_upstream(run_this)
Upvotes: 1