Reputation: 1524
I am currently running this query in Airflow's MysQLOperator. How can I replace region, s3 bucket with parameters using Jinja template?
sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES
TERMINATED BY '\\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
task_id="mysql_to_s3",
dag=dag,
sql=rds_sql,
mysql_conn_id=MYSQL_CONN_ID,
parameters={
"s3_bucket": "my-s3-bucket",
"s3_key_prefix": "my-key",
"region": "ap-southeast-1",
},
autocommit=False,
database="test",
)
Upvotes: 3
Views: 3518
Reputation: 15931
You can use params to pass dynamic values to your SQL:
sql = """SELECT * FROM test
INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
TERMINATED BY ','
OPTIONALLY ENCLOSED BY '"'
LINES
TERMINATED BY '\\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
task_id="mysql_to_s3",
dag=dag,
sql=sql,
mysql_conn_id=MYSQL_CONN_ID,
params={
"s3_bucket": "my-s3-bucket",
"s3_key_prefix": "my-key",
"region": "ap-southeast-1",
},
autocommit=False,
database="test",
)
If the values are stored in Airflow variables (region
, s3_bucket
, s3_key_prefix
) then you can remove the params dict from the operator and change you sql to:
INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'
In both options Airflow will template the sql string and replace the place holders with the values when the operator is executed. You can see the actual values in the task render tab.
Upvotes: 3
Reputation: 439
You can use airflow variables - https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
Airflow jinja template support - https://airflow.apache.org/docs/apache-airflow/stable/concepts/operators.html#concepts-jinja-templating
Upvotes: 0