rjk90
rjk90

Reputation: 23

Why can't I pull an XComs value to be used as an argument in the Airflow GCSToGCSOperator?

I'm trying to pass a list of filenames from the first task in my DAG 'task_get_file_list_click' (task ID 'get_file_list_click') into the 'GCSToGCSOperator' of the second task 'task_copy_file_list_click' using 'xcom_pull' and the previous task ID.

I am running this in a Cloud Composer instance within a GCP project. Composer v1.20.12 and Airflow v2.4.3. The Python code I am using is below.

with models.DAG(
        dag_id='stg_ingestion',
        start_date=days_ago(2),
        schedule_interval='@once',
        render_template_as_native_obj=True,
    ) as dag:

    # get file list for clicks
    task_get_file_list_click = PythonOperator(
        task_id='get_file_list_click',
        provide_context=True,
        python_callable=get_file_list,
        op_args=['click'],
        dag=dag
    )

    # copy files from source to destination bucket
    task_copy_file_list_click = GCSToGCSOperator(
        task_id='copy_file_list_click',
        source_bucket=SOURCE_BUCKET,
        source_objects="{{ ti.xcom_pull(task_ids='get_file_list_click') }}",
        destination_bucket=DESTINATION_BUCKET,
        impersonation_chain=SERVICE_ACCOUNT,
        dag=dag
    )

    # load click file(s) to BigQuery
    task_load_csv_files_click = GCSToBigQueryOperator(
        task_id='load_csv_files_click',
        bucket=SOURCE_BUCKET,
        impersonation_chain=SERVICE_ACCOUNT,
        source_objects="{{ ti.xcom_pull(task_ids='get_file_list_click') }}",    
        compression='GZIP',
        destination_project_dataset_table=f"{DATASET_NAME}.dcm_click",
        schema_object_bucket=SCHEMA_BUCKET,
        schema_object="dags/resources/json/dcm_click.json",        
        write_disposition='WRITE_TRUNCATE',
        skip_leading_rows=1,
        trigger_rule='all_success',
        dag=dag,
    )

I receive the following error on the second task:

"NoneType object is not subscriptable"

However, the third task 'task_load_csv_files_click' succeeds and the list of filenames IS pulled from XComs and used as the argument for 'source_objects' in the 'GCSToBigQueryOperator'.

I can't understand how exactly the same xcom_pull syntax works in one task, but not the other. Any advice appreciated.

Upvotes: 0

Views: 260

Answers (0)

Related Questions