Reputation: 157
I have a set of tasks in a DAG on Airflow for an ETL job that takes data from an open source, transforms it, then makes it available on AWS Athena. The data is split into 5 tables which holds messages about similar events. I partition the data by date and a group id.
My present set up has a task 'add_partition_{0}_{1}' where {0} is the table and {1} is the group id. This task exists within a nested loop. The outer loop cycles thru the group ids and the inner loop cycles thru the tables.
I currently do partitioning by using Jinja to template an add_partition.sql file. We altered the standard athena operator to allow it to directly read a file. However, the size of the items being looped has been increased so now ~150 partitioning tasks are being created. I would like to be able to have a single add_partitions task which can take the .sql file and generate a list of statements to add each partition in a single command. I would pass the list of group_ids, list of tables and the date to the template and create the statements.
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (group_id= '{{ params.group }}', date = '{{ ds }}')
I would like the statements sent to the operator to be like
ALTER TABLE db.table ADD IF NOT EXISTS
PARTITION (group_id= '1', date = '2019-09-30')
ALTER TABLE db.table ADD IF NOT EXISTS
PARTITION (group_id= '2', date = '2019-09-30')
ALTER TABLE db.table ADD IF NOT EXISTS
PARTITION (group_id= '3', date = '2019-09-30')
and so on. Is this possible to do using Jinja templating? I have seen that looping is possible using Jinja but I am not sure how to apply it here?
Upvotes: 0
Views: 1839
Reputation: 157
Turns out this is incredibly easy to do with some very simple Jinja templating.
{% for group in params.groups %}
ALTER TABLE {{ params.database }}.{{ params.table_name }} ADD IF NOT EXISTS
PARTITION (group_id= '{{ group }}', date = '{{ ds }}')
{% endfor %}
Then in the python code where I use the AWSAthenaOperator I loop thru the tables and pass a list of group ids, the database and table_name as parameters (ds is provided by Airflow macros).
Upvotes: 1