Josko de Boer
Josko de Boer

Reputation: 45

Runtime configuration to PythonOperator

I can't find any working documentation on how to use the JSON variables passed with "-c" for e.g. a backfill job.

I've been printing my python tasks **kwargs to find out, but I still can't determine it. provide_context=True

Can anyone point me in the right direction?

So, what I want to do:

airflow backfill mydag -c '{"override":"yes"}' -s 2018-12-01 -e 2018-12-12

I have a PythonOperator:

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    op_kwargs = {
        'task_conf': task_config 
    },
    provide_context=True,
    dag = this_dag
)

Within run_task, I would like to access the override variable:

def run_task(*args, **kwargs): 

    dag_run = kwargs.get('dag_run')
    logging.info(kwargs['dag_run'].conf.get('override'))

But I can't find a method to access this override variable

[2018-12-17 10:07:24,649] {models.py:1760} ERROR - 'NoneType' object has no attribute 'get'
Traceback (most recent call last):
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
    return_value = self.execute_callable()
  File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/daimonie/airflow/dags/dag_scheduled_queries.py", line 65, in run_query
    logging.info(kwargs['dag_run'].conf.get('override'))

Edit: I did find a config setting and description that seems to indicate that these parameters need to already been set in the code

# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params=True

The donot_pickle parameter was set to False.

Upvotes: 2

Views: 5241

Answers (4)

mptrossbach
mptrossbach

Reputation: 384

For anyone wondering, the reason for the error

AttributeError: 'NoneType' object has no attribute 'get

when trying to retrieve a key/value from the conf attribute of dag_run is because the conf arg passed to airflow backfill mydag -c needs to be configured as:

'{"conf": {"key1": "value", ...}}'

not

'{"key1": "value", ...}'

then you can reference it from the context e.g.

def do_something(**context):
    value = context['dag_run'].conf['key']
    ...

Upvotes: 4

alexey
alexey

Reputation: 1

For those who, like me, have wandered here in searching for resolving the same error (ERROR - 'NoneType' object has no attribute 'get'), when tries to read kwargs['dag_run'].conf.get('something').

By default, the dictionary dag_run.conf is not setted when dag runs by scheduler. That's why the error appears. So, when you need to run scheduled dag manually with custom parameters from time to time, just something like this will work:

In callable function:

def run_task(**kwargs):
    if kwargs['dag_run'].conf:
        my_parameter = kwargs['dag_run'].conf.get('my_parameter', None)
    else:
        my_parameter = "some_default_value"

Upvotes: 0

kaxil
kaxil

Reputation: 18844

You can pass parameters from the CLI using -c '{"key":"value"}' and then use it in the Python callable function as "dag_run.conf["key"]"

In your case,

Operator:

PythonOperator(
    task_id = 'task_identifier',
    python_callable = 'run_task',
    provide_context=True,
    dag = this_dag
)

Callable Function:

def run_task(**kwargs):
    print(kwargs["dag_run"].conf["override"]

Upvotes: 0

sdvd
sdvd

Reputation: 443

You need to do two things.

  1. Replace kwargs['run_dag'].conf.get('override') by kwargs['dag_run'].conf.get('override').
  2. Also change the signature from def run_task(*args, **kwargs): to def run_task(**kwargs):

Upvotes: 0

Related Questions