Yohei Onishi
Yohei Onishi

Reputation: 1524

How to use jinja template in Airflow MySQL Operator

I am currently running this query in Airflow's MysQLOperator. How can I replace region, s3 bucket with parameters using Jinja template?

sql = """SELECT * FROM test
INTO OUTFILE S3 's3-ap-southeast-1://my-s3-bucket/my-key'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\\n'
OVERWRITE ON;
"""
mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=rds_sql,
  mysql_conn_id=MYSQL_CONN_ID,
  parameters={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)

Upvotes: 3

Views: 3518

Answers (2)

Elad Kalif
Elad Kalif

Reputation: 15931

You can use params to pass dynamic values to your SQL:

sql = """SELECT * FROM test
INTO OUTFILE S3 '{{ params.region }}://{{ params.s3_bucket }}/{{ params.s3_key_prefix }}'
CHARACTER SET utf8
FORMAT CSV HEADER
FIELDS
  TERMINATED BY ','
  OPTIONALLY ENCLOSED BY '"'
LINES
  TERMINATED BY '\\n'
OVERWRITE ON;
"""

mysql_to_s3 = MySqlOperator(
  task_id="mysql_to_s3",
  dag=dag,
  sql=sql,
  mysql_conn_id=MYSQL_CONN_ID,
  params={
    "s3_bucket": "my-s3-bucket",
    "s3_key_prefix": "my-key",
    "region": "ap-southeast-1",
  },
  autocommit=False,
  database="test",
)

If the values are stored in Airflow variables (region, s3_bucket, s3_key_prefix ) then you can remove the params dict from the operator and change you sql to:

INTO OUTFILE S3 '{{ var.value.region }}://{{ var.value.s3_bucket }}/{{ var.value.s3_key_prefix }}'

In both options Airflow will template the sql string and replace the place holders with the values when the operator is executed. You can see the actual values in the task render tab.

Upvotes: 3

Related Questions