Reputation: 23
I'm trying to pass a list of filenames from the first task in my DAG 'task_get_file_list_click' (task ID 'get_file_list_click') into the 'GCSToGCSOperator' of the second task 'task_copy_file_list_click' using 'xcom_pull' and the previous task ID.
I am running this in a Cloud Composer instance within a GCP project. Composer v1.20.12 and Airflow v2.4.3. The Python code I am using is below.
with models.DAG(
dag_id='stg_ingestion',
start_date=days_ago(2),
schedule_interval='@once',
render_template_as_native_obj=True,
) as dag:
# get file list for clicks
task_get_file_list_click = PythonOperator(
task_id='get_file_list_click',
provide_context=True,
python_callable=get_file_list,
op_args=['click'],
dag=dag
)
# copy files from source to destination bucket
task_copy_file_list_click = GCSToGCSOperator(
task_id='copy_file_list_click',
source_bucket=SOURCE_BUCKET,
source_objects="{{ ti.xcom_pull(task_ids='get_file_list_click') }}",
destination_bucket=DESTINATION_BUCKET,
impersonation_chain=SERVICE_ACCOUNT,
dag=dag
)
# load click file(s) to BigQuery
task_load_csv_files_click = GCSToBigQueryOperator(
task_id='load_csv_files_click',
bucket=SOURCE_BUCKET,
impersonation_chain=SERVICE_ACCOUNT,
source_objects="{{ ti.xcom_pull(task_ids='get_file_list_click') }}",
compression='GZIP',
destination_project_dataset_table=f"{DATASET_NAME}.dcm_click",
schema_object_bucket=SCHEMA_BUCKET,
schema_object="dags/resources/json/dcm_click.json",
write_disposition='WRITE_TRUNCATE',
skip_leading_rows=1,
trigger_rule='all_success',
dag=dag,
)
I receive the following error on the second task:
"NoneType object is not subscriptable"
However, the third task 'task_load_csv_files_click' succeeds and the list of filenames IS pulled from XComs and used as the argument for 'source_objects' in the 'GCSToBigQueryOperator'.
I can't understand how exactly the same xcom_pull syntax works in one task, but not the other. Any advice appreciated.
Upvotes: 0
Views: 260