Reputation: 1424
When I trigger my DAG I pass an extra bunch of parameters in dag_run.conf['metadata']
so my trigger event looks like this:
{'bucket': 'blah-blah',
'contentType': 'text/json',
'crc32c': '375Jog==', 'customTime': '1970-01-01T00:00:00.000Z',
'etag': 'CJCqi+DTrO0CEAk=', 'eventBasedHold': False, 'generation': '1606821286696208',
'id': 'xxx',
'kind': 'storage#object',
'md5Hash': 'xxxx',
'mediaLink': 'xxxx',
'metadata': {'url': 'xxx',
'date_extracted': '20201115',
'file_type': 'xxxx',
'filename': 'xxxx.json',
'row_count': '30', 'time_extracted': '063013'},
}
I have a python function that runs on on_failure_callback
but the context here is totally different from the dag_run
context.
The context passed to function on failure is:
{'conf': <airflow.configuration.AutoReloadableProxy object at 0x7fa275de3c18>,
'dag': <DAG: my_dag>, 'ds': '2020-12-09',
'next_ds': '2020-12-09',
'next_ds_nodash': '20201209',
....}
Is there a way to pass dag_run.conf['metadata']
as part of the new context?
I have tried using partial
but "{{ dag_run.conf['metadata'] }}"
is interpreted as a string.
My dataflowoperator looks like this:
DataflowTemplateOperator(
task_id="df_task1",
job_name="df-{{ dag_run.conf['trace_id'] }}-{{dag_run.conf['file_type']}}",
template="gs://dataflow/my-df-job",
on_failure_callback= partial(task_fail_slack_alert,"{{ dag_run.conf['metadata'] }}"),
parameters={
"filePath":"{{ dag_run.conf['file_name'] }}",
"traceId":"{{ dag_run.conf['trace_id'] }}"
},
)
my callable function just prints out for now:
def task_fail_slack_alert(dag_run,context):
print("payload {}".format(context))
print("dag_run {}".format(dag_run))
Log:
INFO - payload {'conf': <airflow.configuration.AutoReloadableProxy object at 0x7fa275de3c18>...}
INFO - dag_run {{ dag_run.conf['metadata'] }}
Upvotes: 1
Views: 721
Reputation: 15931
You can't use {{ dag_run.conf['metadata'] }}
like that.
You can access it from the context of the function:
def task_fail_slack_alert(context):
dag_run = context.get('dag_run')
task_instances = dag_run.get_task_instances()
print(task_instances)
print(dag_run.conf.get('metadata'))
print(context.get('exception'))
DataflowTemplateOperator(
task_id="df_task1",
job_name="df-{{ dag_run.conf['trace_id'] }}-{{dag_run.conf['file_type']}}",
template="gs://dataflow/my-df-job",
on_failure_callback=task_fail_slack_alert,
parameters={
"filePath":"{{ dag_run.conf['file_name'] }}",
"traceId":"{{ dag_run.conf['trace_id'] }}"
},
)
Upvotes: 3