Reputation: 33
I am new in the Airflow world and trying to understand one thing. For example I have a DAG that contains 2 tasks. The first task is submitting spark job, and the second one is Sensor that waits for a file in s3.
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
DATE = datetime.strptime(RUN_DATE_ARG, DATE_FORMAT_PY) - timedelta(hours=1)
with DAG() as dag:
submit_spark_job = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=VIRTUAL_CLUSTER_ID,
execution_role_arn=JOB_ROLE_ARN,
release_label="emr-6.3.0-latest",
job_driver=JOB_DRIVER_ARG,
configuration_overrides=CONFIGURATION_OVERRIDES_ARG,
name=f"spark-{RUN_DATE_ARG}",
retries=3
)
validate_s3_success_file = S3KeySensor(
task_id='check_for_success_file',
bucket_name="bucket-name",
bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
poke_interval=10,
timeout=60,
verify=False,
)
I have a RUN_DATE_ARG that by default should be taken from datetime.utcnow()
and this is one of sparks java arguments that I should provide to my job.
I want to add an ability to submit job with custom date argument (via airflow UI).
When I am trying to retrieve it as '{{ dag_run.conf["date"] | None}}'
it replaces with value inside task configuration (bucket_key=f"blabla/date={DATE.strftime('%Y-%m-%d')}/hour={DATE.strftime('%H')}/_SUCCESS",
), but not for DAG's python code if I do following:
date='{{ dag_run.conf["date"] | None}}'
if date is None:
RUN_DATE_ARG = datetime.utcnow().strftime(DATE_FORMAT_PY)
else:
RUN_DATE_ARG = date
Do I have any way to use this value as a code variable?
Upvotes: 2
Views: 1889
Reputation: 15979
You can not use templating outside of operators scope.
You should use Jinja if statements in the operator templated parameter. The following is just a general idea:
submit_spark_job = EmrContainerOperator(
task_id="start_job",
...
name="spark-{{ dag_run.conf["date"] if dag_run.conf["date"] is not None else jinja_utc_now }}",
)
You will need to replace jinja_utc_now
with code that retrieve the timestamp probably something like what is shown in this answer.
You can also use:
{% if something %}
code
{% else %}
another code
{% endif %}
From Airflow point of view it takes the parameter and pass it though Jinja engine for templating so the key issue here is just to use the proper Jinja syntax.
Upvotes: 2