ldacey
ldacey

Reputation: 578

Change default XcomArg key in custom operators

I have a bunch of custom operators and I wanted to try to make use of XcomArg and using .output in my tasks.

For example, below I commented out the xcom_push to return the list:

def execute(self, context):
    # context["ti"].xcom_push(key="extract_list", value=extract_list)
    return extract_list

The issue is that my key has historically been "extract_list" and I have some references to that key elsewhere. And I pass other xcoms (such as maximum ID/timestamp) and those xcoms are labeled as return_value.

Can I change the key of an xcom that I push?

This snippet works, but the key is return_value:

    extract = FileToAzureBlobOperator(
        task_id="extract-test",
        remote_directories=["/input/test"],
        subfolders=["test", "raw"],
        params={
            "start": "{{ data_interval_start }}",
            "end": "{{ data_interval_end }}",
        },
    )

    transform = PrepareParquetOperator(
        task_id="transform-test",
        input_files=extract.output,
        output_folder="test/staging",
        custom_transform_script="scripts.common.test",
        partition_columns=["date_id"],
    )

I have tried adding test = XComArg(operator=extract, key="test_key") and then have the input_files=test in my transform task as well, but no luck. I think I need to overwrite the default key inside of my FileToAzureBlobOperator.

Upvotes: 0

Views: 1507

Answers (2)

ldacey
ldacey

Reputation: 578

Here is an example which worked using XComArg. It required changing my DAG file a bit.

    extract = FileToAzureBlobOperator(
        task_id="extract-test",
        remote_directories=["/input/test"],
        subfolders=["test", "raw"],
        params={
            "start": "{{ data_interval_start }}",
            "end": "{{ data_interval_end }}",
        },
    )

    extracted_files = XComArg(extract, "extract_list")

    transform = PrepareParquetOperator(
        task_id="transform",
        input_files=extracted_files,
    )

    transformed_files = XComArg(transform, "filter_list")

    finalize = DatasetToDatasetOperator(
        task_id="finalize",
        input_files=transformed_files,
    
    )

    extracted_files >> transformed_files >> finalize

Eventually my plan is to remove xcom.push() from my operators and just return the values directly so I can make use of .output without requiring the XComArg line. I just need to clean up references to the custom key names in other areas.

Upvotes: 0

Jarek Potiuk
Jarek Potiuk

Reputation: 20067

I believe you should be able to return the value as dictionary to get what you want:

return {"extract_list": extract_list}

Upvotes: 2

Related Questions