Reputation: 578
I have a bunch of custom operators and I wanted to try to make use of XcomArg and using .output in my tasks.
For example, below I commented out the xcom_push
to return the list:
def execute(self, context):
# context["ti"].xcom_push(key="extract_list", value=extract_list)
return extract_list
The issue is that my key has historically been "extract_list" and I have some references to that key elsewhere. And I pass other xcoms (such as maximum ID/timestamp) and those xcoms are labeled as return_value
.
Can I change the key of an xcom that I push?
This snippet works, but the key is return_value
:
extract = FileToAzureBlobOperator(
task_id="extract-test",
remote_directories=["/input/test"],
subfolders=["test", "raw"],
params={
"start": "{{ data_interval_start }}",
"end": "{{ data_interval_end }}",
},
)
transform = PrepareParquetOperator(
task_id="transform-test",
input_files=extract.output,
output_folder="test/staging",
custom_transform_script="scripts.common.test",
partition_columns=["date_id"],
)
I have tried adding test = XComArg(operator=extract, key="test_key")
and then have the input_files=test
in my transform task as well, but no luck. I think I need to overwrite the default key inside of my FileToAzureBlobOperator.
Upvotes: 0
Views: 1507
Reputation: 578
Here is an example which worked using XComArg. It required changing my DAG file a bit.
extract = FileToAzureBlobOperator(
task_id="extract-test",
remote_directories=["/input/test"],
subfolders=["test", "raw"],
params={
"start": "{{ data_interval_start }}",
"end": "{{ data_interval_end }}",
},
)
extracted_files = XComArg(extract, "extract_list")
transform = PrepareParquetOperator(
task_id="transform",
input_files=extracted_files,
)
transformed_files = XComArg(transform, "filter_list")
finalize = DatasetToDatasetOperator(
task_id="finalize",
input_files=transformed_files,
)
extracted_files >> transformed_files >> finalize
Eventually my plan is to remove xcom.push()
from my operators and just return the values directly so I can make use of .output
without requiring the XComArg line. I just need to clean up references to the custom key names in other areas.
Upvotes: 0
Reputation: 20067
I believe you should be able to return the value as dictionary to get what you want:
return {"extract_list": extract_list}
Upvotes: 2