Reputation: 1045
I'm currently following the template given here: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py to create a DAG to call for a emr instance using spark submit. When setting up the spark_test_steps, I need to include variables passed in from a POST Json to fill the spark submit like below:
SPARK_TEST_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
kwargs['dag_run'].conf['var_1']
kwargs['dag_run'].conf['var_2']
'10'
]
}
}
]
How can I pass in variables given by the POST Json while still following the format given in the git link to look like below?
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
DEFAULT_ARGS = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False
}
dag = DAG(
'emr_job_flow_manual_steps_dag',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 3 * * *'
)
var_1 = ''
var_2 = ''
SPARK_TEST_STEPS = []
def define_param(**kwargs):
global var_1
global var_2
global SPARK_TEST_STEPS
var_1 = str(kwargs['dag_run'].conf['var_1'])
var_2 = str(kwargs['dag_run'].conf['var_2'])
SPARK_TEST_STEPS = [
{
'Name': 'calculate_pi',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'/usr/lib/spark/bin/run-example',
'SparkPi',
kwargs['dag_run'].conf['var_1']
kwargs['dag_run'].conf['var_2']
'10'
]
}
}
]
return SPARK_TEST_STEPS
DEFINE_PARAMETERS = PythonOperator(
task_id='DEFINE_PARAMETERS',
python_callable=define_param,
provide_context=True,
dag=dag)
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
dag=dag
)
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps='{{ ti.xcom_pull(task_ids="DEFINE_PARAMETERS") }}',
dag=dag
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
dag=dag
)
cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)
I cannot use Variable.get and Variable.set as this will not allow multiple dag calls for different variable types at the same time due to the constant changing of airflow global variables. I have tried calling SPARK_TEST_STEPS using xcom but the return type of xcom is string and EmrAddStepsOperator steps requires a list.
Upvotes: 1
Views: 2217
Reputation: 31
I solved a similar problem by creating a custom operator that parses the json prior to executing. The cause of the problem is that when you pass steps='{{ ti.xcom_pull(task_ids="DEFINE_PARAMETERS") }}',
. you are literally passing a string with the value interpolated by the templating engine, it is not deserialized.
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
import json
class DynamicEmrStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id', 'steps']
template_ext = ()
ui_color = '#f9c915'
@apply_defaults
def __init__(
self,
job_flow_id=None,
steps="[]",
*args, **kwargs):
super().__init__(
job_flow_id = job_flow_id,
steps = steps,
*args, **kwargs)
def execute(self, context):
self.steps = json.loads(self.steps)
return super().execute(context)
Upvotes: 3