vdolez
vdolez

Reputation: 1118

Passing return value from operator to following operator in Airflow

I'm trying to give a string list to the source_objects field for the GoogleCloudStorageToBigQueryOperator but with the following code I get an error:

string indices must be integers, not unicode

Things I don't know :

Things I thought of :

The thing I want to do :

Also, it seems that some of the operator's fields use a feature called templated_field, what's the mechanism behind the template fields? Isn't just for PythonOperator and BashOperator?

And a last one, why does PythonOperator does not return a TaskInstance?

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        fileName = kwargs['dag_run'].conf['fileName']
        return [fileName]

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        #source_objects=['data.csv'], 
        source_objects=get_file_name.xcom_pull(context='', task_ids='get_file_name'), 
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_EMPTY')

    bq_load.set_upstream(get_file_name)

I'm kind of new to Python and Airflow. These kinds of things are supposed to be trivial I guess. I'm sure there's something I misunderstood here.

Upvotes: 3

Views: 13190

Answers (2)

Vzzarr
Vzzarr

Reputation: 5660

@vdolez's answer was really helpful, adding on top of that the because what is sharing is a Jinja template, it is possible to access to the result fields within the Jinja template.

For example in my case I had to return 2 values from the upstream task, so a Tuple made sense to me. The way to access fields from the Tuple I'm passing then is the following:

"{{ task_instance.xcom_pull(task_ids='get_file_name')[0] }}"

where [0] - used to access the first element of the Tuple - goes inside the Jinja template.

This logic of course can then be extended to any data type in Python (Dictionary, list etc.).

Adding this in case it is needed to pass more complex results, this is the way to access them based on @vdolez's answer.

Upvotes: 1

vdolez
vdolez

Reputation: 1118

After many tests, I came up with this solution, thanks to tobi6 for his comment which pointed me in the right direction. I had to use the template_fields feature.

When I tried to return a list with a single string, I had concatenation errors, so I had to return a single string in my XCOM and the surround the template call to XCOM with brackets to make the result a list.

Here's the final code :

with DAG('bq_load_file_from_cloud_function', default_args=default_args) as dag:

    def get_file_name_from_conf(ds, **kwargs):
        return kwargs['dag_run'].conf['fileName']

    get_file_name = PythonOperator(
        task_id='get_file_name',
        provide_context=True,
        python_callable=get_file_name_from_conf)

    bq_load = GoogleCloudStorageToBigQueryOperator(
        task_id='bq_load', 
        bucket='src_bucket', 
        source_objects=["{{ task_instance.xcom_pull(task_ids='get_file_name') }}"],
        destination_project_dataset_table='project:dataset.table', 
        write_disposition='WRITE_APPEND')

    bq_load.set_upstream(get_file_name)

Upvotes: 4

Related Questions