Reputation: 45
I can't find any working documentation on how to use the JSON variables passed with "-c" for e.g. a backfill job.
I've been printing my python tasks **kwargs to find out, but I still can't determine it. provide_context=True
Can anyone point me in the right direction?
So, what I want to do:
airflow backfill mydag -c '{"override":"yes"}' -s 2018-12-01 -e 2018-12-12
I have a PythonOperator:
PythonOperator(
task_id = 'task_identifier',
python_callable = 'run_task',
op_kwargs = {
'task_conf': task_config
},
provide_context=True,
dag = this_dag
)
Within run_task, I would like to access the override variable:
def run_task(*args, **kwargs):
dag_run = kwargs.get('dag_run')
logging.info(kwargs['dag_run'].conf.get('override'))
But I can't find a method to access this override variable
[2018-12-17 10:07:24,649] {models.py:1760} ERROR - 'NoneType' object has no attribute 'get'
Traceback (most recent call last):
File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 95, in execute
return_value = self.execute_callable()
File "/home/daimonie/.local/lib/python3.6/site-packages/airflow/operators/python_operator.py", line 100, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/daimonie/airflow/dags/dag_scheduled_queries.py", line 65, in run_query
logging.info(kwargs['dag_run'].conf.get('override'))
Edit: I did find a config setting and description that seems to indicate that these parameters need to already been set in the code
# Whether to override params with dag_run.conf. If you pass some key-value pairs through `airflow backfill -c` or
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params=True
The donot_pickle parameter was set to False.
Upvotes: 2
Views: 5241
Reputation: 384
For anyone wondering, the reason for the error
AttributeError: 'NoneType' object has no attribute 'get
when trying to retrieve a key/value from the conf
attribute of dag_run
is because the conf
arg passed to airflow backfill mydag -c
needs to be configured as:
'{"conf": {"key1": "value", ...}}'
not
'{"key1": "value", ...}'
then you can reference it from the context e.g.
def do_something(**context):
value = context['dag_run'].conf['key']
...
Upvotes: 4
Reputation: 1
For those who, like me, have wandered here in searching for resolving the same error (ERROR - 'NoneType' object has no attribute 'get'
), when tries to read kwargs['dag_run'].conf.get('something')
.
By default, the dictionary dag_run.conf
is not setted when dag runs by scheduler. That's why the error appears.
So, when you need to run scheduled dag manually with custom parameters from time to time, just something like this will work:
In callable function:
def run_task(**kwargs):
if kwargs['dag_run'].conf:
my_parameter = kwargs['dag_run'].conf.get('my_parameter', None)
else:
my_parameter = "some_default_value"
Upvotes: 0
Reputation: 18844
You can pass parameters from the CLI using -c '{"key":"value"}'
and then use it in the Python callable function as "dag_run.conf["key"]"
In your case,
Operator:
PythonOperator(
task_id = 'task_identifier',
python_callable = 'run_task',
provide_context=True,
dag = this_dag
)
Callable Function:
def run_task(**kwargs):
print(kwargs["dag_run"].conf["override"]
Upvotes: 0
Reputation: 443
You need to do two things.
kwargs['run_dag'].conf.get('override')
by kwargs['dag_run'].conf.get('override')
. def run_task(*args, **kwargs):
to def run_task(**kwargs):
Upvotes: 0