Rajat
Rajat

Reputation: 53

Airflow - Generating tasks dynamically from BigQuery, but tasks are run repeatedly before previous finishes

Context

I'm trying to build an ingestion pipeline on Google Cloud Platform using Composer, DataProc and BigQuery. I have a table in BigQuery which contains records of data source and its relevant file. So if there are 5 files which I need to ingest, I have 5 records in the BigQuery table. Tomorrow it could be a different number of files. Hence, I thought about building tasks dynamically within my DAG.

The high level design is as follows:

This setup runs fine. I can see my DAG and all dynamically generated tasks in Airflow UI.

Edit: Just adding few more details. The BigQuery table will have less than 25 records so querying the table is not concern. Querying the table every 30 seconds is. Secondly, I only need this DAG to run once every 4 hours or so. I do not intend to keep my composer running for that time. All I need every 4 hours is to boot Composer, run the DAG once to process all available files and then shut down.

The problem

While these DataProc tasks are executing, after about a couple of minutes Airflow refreshes the DAG and runs the same set of tasks again. In DataProc Jobs console, I see 2 (sometimes 3) instances of the same task in running state. This is undesirable.

What I have tried

I have set retries=0 at the task level and on the DAG I have set catchup=False, max_active_runs=1, and schedule_interval='@once'. Default arguments for the DAG also have retries=0.

I think the issue is because the part where I am pulling records from BigQuery is part of an ordinary function, rather than being a task in itself. The reason I have not put that in a task is because I couldn't find a solution to pass fetched result from BigQuery into subsequent tasks where I have to loop over them.

I tried calling a PythonOperator and executing Variable.set("df", df) inside it in the hope that I can loop over Variable.get("df") but that didn't work out either.

Sharing relevant code below.

def fetch_pending_files_from_bq():
    # fetch records from BigQuery and return as dataframe

default_args = {
    'start_date': yesterday,
    'default_timezone': 'utc',
    'retries': 0
}

dag = DAG(
    dagid,
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    description='DAG to ingest data',
    schedule_interval='@once'
)

start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)

pending_files_df = fetch_pending_files_from_bq()

for index, row in pending_files_df.iterrows():
    task = DataProcSparkOperator(
        dag=dag,
        task_id=row["file_name"],
        arguments=dataproc_args,
        region="us-east1",
        job_name="job_{}".format(task_id),
        dataproc_spark_jars=dataproc_jars,
        ....
        ....
    )

    task.set_upstream(start_dag)
    task.set_downstream(end_dag)

I get the orchestration that I want, the only issue is my DataProc jobs getting re-run automatically.

Any ideas are appreciated.

Upvotes: 0

Views: 1186

Answers (1)

Rajat
Rajat

Reputation: 53

While diving deeper into the design, I realized that fetch_pending_files_from_bq is not a task so it gets executed each time the dag is refreshed. That led to multiple queries, and also caused the unexpected creation of duplicate tasks. Hence I dropped this design.

I was able to fix this using subdags. The first subdag reads from BQ and writes to GCS. The second subdag reads the file from GCS and creates tasks dynamically.

Upvotes: 1

Related Questions