Altons
Altons

Reputation: 1424

Pass extra arguments to DataFlowOperator context

When I trigger my DAG I pass an extra bunch of parameters in dag_run.conf['metadata']

so my trigger event looks like this:

{'bucket': 'blah-blah', 
      'contentType': 'text/json', 
      'crc32c': '375Jog==', 'customTime': '1970-01-01T00:00:00.000Z',
      'etag': 'CJCqi+DTrO0CEAk=', 'eventBasedHold': False, 'generation': '1606821286696208',
      'id': 'xxx',
      'kind': 'storage#object',
      'md5Hash': 'xxxx',
      'mediaLink': 'xxxx',
      'metadata': {'url': 'xxx',
                   'date_extracted': '20201115',
                   'file_type': 'xxxx',
                   'filename': 'xxxx.json',
                   'row_count': '30', 'time_extracted': '063013'},
}

I have a python function that runs on on_failure_callback but the context here is totally different from the dag_run context.

The context passed to function on failure is:

{'conf': <airflow.configuration.AutoReloadableProxy object at 0x7fa275de3c18>,
 'dag': <DAG: my_dag>, 'ds': '2020-12-09', 
'next_ds': '2020-12-09', 
'next_ds_nodash': '20201209',
....}

Is there a way to pass dag_run.conf['metadata'] as part of the new context?

I have tried using partial but "{{ dag_run.conf['metadata'] }}" is interpreted as a string.

My dataflowoperator looks like this:

DataflowTemplateOperator(
        task_id="df_task1",
        job_name="df-{{ dag_run.conf['trace_id'] }}-{{dag_run.conf['file_type']}}",
        template="gs://dataflow/my-df-job",
        on_failure_callback= partial(task_fail_slack_alert,"{{ dag_run.conf['metadata'] }}"),
        parameters={
            "filePath":"{{ dag_run.conf['file_name'] }}",
            "traceId":"{{ dag_run.conf['trace_id'] }}"
        },
    )

my callable function just prints out for now:

def task_fail_slack_alert(dag_run,context):
    print("payload {}".format(context))
    print("dag_run {}".format(dag_run))

Log:

INFO - payload {'conf': <airflow.configuration.AutoReloadableProxy object at 0x7fa275de3c18>...}
INFO - dag_run {{ dag_run.conf['metadata'] }}

Upvotes: 1

Views: 721

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 15931

You can't use {{ dag_run.conf['metadata'] }} like that. You can access it from the context of the function:

def task_fail_slack_alert(context):
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print(task_instances)
    print(dag_run.conf.get('metadata'))
    print(context.get('exception'))

DataflowTemplateOperator(
        task_id="df_task1",
        job_name="df-{{ dag_run.conf['trace_id'] }}-{{dag_run.conf['file_type']}}",
        template="gs://dataflow/my-df-job",
        on_failure_callback=task_fail_slack_alert,
        parameters={
            "filePath":"{{ dag_run.conf['file_name'] }}",
            "traceId":"{{ dag_run.conf['trace_id'] }}"
        },
    )

Upvotes: 3

Related Questions