Reputation: 39
I am not sure where I am doing wrong. I am running a DAG and I have set retries to 5. My expectation is that, If a task got failed, it will retry 5 times and if it doesn't get pass it will mark it as failed but contrary to that, the task marked as success and triggered the upstream task, despite it has not done all the updates required in that particular task to get complete. Am I missing any required parameter? Thanks.
Note- I am using dataproc cluster and updating records in BQ table and it has limitation of 1500 updates/table/day. That limit got exceeded, but airflow didn't bother to mark it failed, rather it has triggered upstream task. Any fix which we can use? so that the task would have failed even if there is limit/quota exceed issue?
issue- "write to table failed: Reason: 403 limit error due to append/update limit exceeded"- After this, It started running the other codes available. But how can I tell Airflow that it's an error and should stop the task?
writing_to_bq(df=ent1, table, retry_number=3)
def writing_to_bq(df,table,retry):
for _ in range(retry+1):
try:
df.to_gbq(table,if_exists=“append")
break
except
Exception as e:
log.info("Trying again”)
Upvotes: 0
Views: 670
Reputation: 6582
Normally if you set args globally (for all your tasks) or only in the task, it should work as expected :
dag_default_args={
'owner' : 'Airflow',
'retries': 5,
'retry_delay': timedelta(minutes=2),
...
}
with airflow.DAG(
"your_dag_id",
default_args=dag_default_args,
schedule_interval=None) as dag:
....
In this example, I used a retry_delay
to 2 minutes.
After discussed together, you indicated the BigQueryInsertJobOperator operator doesn't mark the task as failed if you receive a 403 status for a quota limit exceeded.
You can also use a PythonOperator
with Python
BigQuery
client, in this case normally the client will throw an error for a 403 status. If it's not the case, you can have more flexibility to apply logic based on the result :
def execute_bq_query():
from google.cloud import bigquery
client = bigquery.Client()
QUERY = 'SELECT ....'
query_job = client.query(QUERY)
rows = query_job.result()
PythonOperator(
task_id="task_id",
python_callable=execute_bq_query
)
Upvotes: 1