Reputation: 53
Context
I'm trying to build an ingestion pipeline on Google Cloud Platform using Composer, DataProc and BigQuery. I have a table in BigQuery which contains records of data source and its relevant file. So if there are 5 files which I need to ingest, I have 5 records in the BigQuery table. Tomorrow it could be a different number of files. Hence, I thought about building tasks dynamically within my DAG.
The high level design is as follows:
This setup runs fine. I can see my DAG and all dynamically generated tasks in Airflow UI.
Edit: Just adding few more details. The BigQuery table will have less than 25 records so querying the table is not concern. Querying the table every 30 seconds is. Secondly, I only need this DAG to run once every 4 hours or so. I do not intend to keep my composer running for that time. All I need every 4 hours is to boot Composer, run the DAG once to process all available files and then shut down.
The problem
While these DataProc tasks are executing, after about a couple of minutes Airflow refreshes the DAG and runs the same set of tasks again. In DataProc Jobs console, I see 2 (sometimes 3) instances of the same task in running state. This is undesirable.
What I have tried
I have set retries=0
at the task level and on the DAG I have set catchup=False
, max_active_runs=1
, and schedule_interval='@once'
. Default arguments for the DAG also have retries=0
.
I think the issue is because the part where I am pulling records from BigQuery is part of an ordinary function, rather than being a task in itself. The reason I have not put that in a task is because I couldn't find a solution to pass fetched result from BigQuery into subsequent tasks where I have to loop over them.
I tried calling a PythonOperator and executing Variable.set("df", df)
inside it in the hope that I can loop over Variable.get("df")
but that didn't work out either.
Sharing relevant code below.
def fetch_pending_files_from_bq():
# fetch records from BigQuery and return as dataframe
default_args = {
'start_date': yesterday,
'default_timezone': 'utc',
'retries': 0
}
dag = DAG(
dagid,
default_args=default_args,
catchup=False,
max_active_runs=1,
description='DAG to ingest data',
schedule_interval='@once'
)
start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)
pending_files_df = fetch_pending_files_from_bq()
for index, row in pending_files_df.iterrows():
task = DataProcSparkOperator(
dag=dag,
task_id=row["file_name"],
arguments=dataproc_args,
region="us-east1",
job_name="job_{}".format(task_id),
dataproc_spark_jars=dataproc_jars,
....
....
)
task.set_upstream(start_dag)
task.set_downstream(end_dag)
I get the orchestration that I want, the only issue is my DataProc jobs getting re-run automatically.
Any ideas are appreciated.
Upvotes: 0
Views: 1186
Reputation: 53
While diving deeper into the design, I realized that fetch_pending_files_from_bq is not a task so it gets executed each time the dag is refreshed. That led to multiple queries, and also caused the unexpected creation of duplicate tasks. Hence I dropped this design.
I was able to fix this using subdags. The first subdag reads from BQ and writes to GCS. The second subdag reads the file from GCS and creates tasks dynamically.
Upvotes: 1