Reputation: 21
I am new to ETL and working on airflow and snowflake. I am getting max created values using python operator from mysql table and on base of the xcom of that operator I am create a csv file of snowflake data to dump only latest created at data from mysql to snowflake. The issue is airflow xcom is returning in double quotes when I pull value inside sql template. While snowflake accepts the single qoutes in its sql query. Error image
Following is my DAG code:
def defaultconverter(o):
if isinstance(o, datetime):
return o.__str__()
def get_max_created_timestamp(sql_table_name):
hook = MySqlHook(MYSQL_CONN)
check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
f"and table_schema = '{MYSQL_SCHEMA}';"
print(hook.schema)
data = hook.get_records(check_column)
if any('created_at' in x for x in data):
date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
(created_timestamp_max,) = hook.get_first(date_sql)
return json.dumps(created_timestamp_max, default=defaultconverter)
# return int(created_timestamp_max)
else:
return 0
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": [],
"email_on_failure": True,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
"template_searchpath": [TEMPLATE_SEARCHPATH, ]
}
with DAG(dag_id="lion_sense_snowflake_to_mysql_v1",
start_date=datetime(2021, 12, 1, 0, 0, 0, 0),
schedule_interval="@daily",
catchup=False,
default_args=default_args,
max_active_runs=1,
) as dag:
dag.doc_md = DOCS
for table in tables:
mysql_table = table["mysql_table"]
snowflake_table = table["snowflake_table"]
delete_flag = table["delete"]
get_max_timestamp_task = PythonOperator(
task_id=f"get_max_timestamp_{mysql_table}",
python_callable=get_max_created_timestamp,
op_args=[mysql_table, ],
do_xcom_push=True,
)
create_snowflake_table_csv = SnowflakeOperator(
task_id=f"create_snowflake_{snowflake_table}_table_csv",
dag=dag,
sql="sql/convert_snowflake_table_to_csv.sql",
snowflake_conn_id=SNOWFLAKE_CONN_ID,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
params={
"snowflake_table": snowflake_table,
"delete_flag": delete_flag,
"max_date": get_max_timestamp_task.output
}
)
Mysql query template:
copy into @S3_TKXEL_DEVEOPMENT_STAGE/airflow/{{ dag.dag_id }}/{{ ds_nodash }}/{{ params.snowflake_table }}/{{ ds_nodash }}_{{ params.snowflake_table }}.csv
from (
select * from {{ params.snowflake_table }}
{% if params.delete_flag %}
where created_at > {{ params.max_date}}
{% endif %}
)
file_format = (TYPE = CSV, COMPRESSION = NONE, NULL_IF=(''), field_optionally_enclosed_by='"' )
OVERWRITE = TRUE
SINGLE = TRUE
MAX_FILE_SIZE=5000000000;
Thanks in advance for adding into my knowledge.
Upvotes: 2
Views: 1152
Reputation: 25968
does changing the output of get_max_created_timestamp
:
def get_max_created_timestamp(sql_table_name):
hook = MySqlHook(MYSQL_CONN)
check_column = f"select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = '{sql_table_name}' " \
f"and table_schema = '{MYSQL_SCHEMA}';"
print(hook.schema)
data = hook.get_records(check_column)
if any('created_at' in x for x in data):
date_sql = f'select max(created_at) created_timestamp_max from {MYSQL_SCHEMA}.{sql_table_name}'
(created_timestamp_max,) = hook.get_first(date_sql)
return "'" + created_timestamp_max + "'"
else:
return 0
help, as now the string will be correctly quoted for snowflakes string expectations.
Upvotes: 1