frankencat
frankencat

Reputation: 11

Why does a pythonoperator callable not need to accept parameters in airflow?

I do not understand how callables (function called as specified by PythonOperator) n Airflow should have their parameter list set. I have seen the with no parameters or with named params or **kwargs. I can always add "ti" or **allargs as parameters it seems, and ti seems to be used for task instance info, or ds for execution date. But my callables do not NEED params apparently. They can be simply be "def function():". If I wrote a regular python function func() instead of func(**kwargs), it would fail at runtime when called unless no params were passed. Airflow always seems to pass t1 all the time, so how can the callable function signature not require it?? Example below from a training site where _process_data func gets the ti, but _extract_bitcoin_price() does not. I was thinking that is because of the xcom push, but ti is ALWAYS available it seems, so how can "def somefunc()" ever work? I tried looking at pythonoperator source code, but I am unclear how this works or best practices for including parameters in a callable. Thanks!!

from airflow import DAG
from airflow.operators.python_operator 
import PythonOperator

from datetime import datetime
import json
from typing import Dict
import requests
import logging

API = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin&vs_currencies=usd&include_market_cap=true&include_24hr_vol=true&include_24hr_change=true&include_last_updated_at=true"

def \_extract_bitcoin_price():
    return requests.get(API).json()\['bitcoin'\]

def \_process_data(ti):
    response = ti.xcom_pull(task_ids='extract_bitcoin_price')
    logging.info(response)
    processed_data = {'usd': response\['usd'\], 'change': response\['usd_24h_change'\]}
    ti.xcom_push(key='processed_data', value=processed_data)

def \_store_data(ti):
    data = ti.xcom_pull(task_ids='process_data', key='processed_data')
    logging.info(f"Store: {data\['usd'\]} with change {data\['change'\]}")

with DAG('classic_dag', schedule_interval='@daily', start_date=datetime(2021, 12, 1), catchup=False) as dag:

extract_bitcoin_price = PythonOperator(
    task_id='extract_bitcoin_price',
    python_callable=_extract_bitcoin_price
)

process_data = PythonOperator(
    task_id='process_data',
    python_callable=_process_data
)

store_data = PythonOperator(
    task_id='store_data',
    python_callable=_store_data
)

extract_bitcoin_price >> process_data >> store_data

Tried callables with no params somefunc() expecting to get error saying too many params passed, but it succeeded. Adding somefunc(ti) also works! How can both work?

Upvotes: 1

Views: 5196

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

I think what you are missing is that Airflow allows to pass the context of the task to the python callable (as you can see one of them is the ti). These are additional useful parameters that Airflow provides and you can use them in your task.

In older Airflow versions user had to set provide_context=True which for that to work:

 process_data = PythonOperator(
    ...,
    provide_context=True
)

Since Airflow>=2.0 there is no need to use provide_context. Airflow handles it under the hood.

When you see in the Python Callable signatures like:

def func(ti, **kwargs):
    ...

This means that the ti is "unpacked" from the kwargs. You can also do:

def func(**kwargs):
    ti = kwargs['ti']

EDIT: I think what you are missing is that while you write:

def func()
        ...

store_data = PythonOperator(
    task_id='task',
    python_callable=func
)

Airflow does more than just calling func. The code being executed is the execute() function of PythonOperator and this function calls the python callable you provided with args and kwargs.

Upvotes: 3

Related Questions