Reputation: 557
I'm retrieving data from an API and converting the data into a pandas dataframe. I'm using python-snowflake connector to send this data into my snowflake schema as a table.
I want to use merge instead of sending the duplicate data into my snowflake table.
Sample data I'm retrieving from API:
|------------|-------------|------------|
| log_in_ID | user_id | date |
|------------|-------------|------------|
| 1 | 21 | 02/21/2021 |
| 2 | 22 | 02/24/2021 |
| 3 | 23 | 02/27/2021 |
| 4 | 21 | 02/29/2021 |
|------------|-------------|------------|
The log_in_ID is unique
Here is my code:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine(URL(
account='my_snowflake_account',
user='user',
password='password',
database='my_database'
schema='my_schema',
warehouse='warehouse',
role='ADMIN'))
pandas_df = 'Some code to get API data and convert into pandas dataframe'
def send_to_snowflake(pandas_df, target_table):
connect = engine.connect()
data.tosql(target_table, con=engine, index=False, if_exists='append')
connection.close()
engine.dispose()
if __name__ == "__main__":
send_to_snowflake(pandas_df, target_table)
How can I use merge statement with log_in_id as unique key?
How can I use a pandas dataframe inside the merge query in snowflake-python?
merge into target_table using {pandas_dataframe}
on target_table.log_in_id = {pandas_dataframe}.log_in_id
when matched then
update set target_table.user_id = {pandas_dataframe}.user_id and
set target_table.date = {pandas_dataframe}.date
Upvotes: 3
Views: 3932
Reputation: 1321
If structure of your API is similar to below format: [(1, 21, 'A'), (2, 22, 'AA'), (3, 23, 'AB'), (4, 21, 'AC')]
This code will works to merge the API data into snowflake target table without loading source data into tables:
import requests
import json
import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
def sample_func():
engine = create_engine(URL(
account='xxx',
user='xxx',
password='xxx',
database='xxx',
schema='PUBLIC',
warehouse='COMPUTE_WH',
role='xxx',
))
connection = engine.connect()
pandas_df = 'select * from A'
try:
cursor_return = connection.execute(pandas_df)
cursor_result = cursor_return.fetchall()
api_data = str(cursor_result)[1:-1]
print(api_data)
merge_temp = """
merge into B target_table using (select COLUMN1,COLUMN2,COLUMN3 from values{0}) src
on target_table.log_in_id = src.COLUMN1
when matched then
update set target_table.log_in_id = src.COLUMN1,
target_table.user_id = src.COLUMN2,
target_table.test_data = src.COLUMN3
when not matched then
insert
(log_in_id, user_id, test_data) values(src.COLUMN1, src.COLUMN2, src.COLUMN3)
""".format(str(api_data))
print(merge_temp)
c_return = connection.execute(merge_temp)
c_result = c_return.fetchall()
print(c_result)
print("Number rows inserted: {0} || Number of rows updated: {1}".format(str(c_result[0][0]), str(c_result[0][1])))
finally:
connection.close()
engine.dispose()
sample_func()
But I would recommend to load your API data into temp table and use merge statement on temp table, this approach will more faster then loading it from dataframe or csv files.
Upvotes: 5