Reputation: 33
1)Python to snowflake data connection is performed using python connector 2)External stage is set pointing to S3 bucket The requirement is to create a dynamic table based on each CSV.
Eg- I have 10 CSV present in the s3 bucket then 10 different tables should get created dynamically referring to the external stage
Sql_query=?
Sql_query= copy into db.schema.table from @db.schema.external_stage (In scenario where table structure is already created in snowflake)
Upvotes: 0
Views: 2666
Reputation: 2746
Refer for python - snowflake connection.
Refer for query data from snowflake using python.
Below Example will read a stage with two files and create two tables based on file names in stage and then insert data into each table.
This is how the stage looks [output truncated, column-wise] -
list @test_stage;
+------------------------------+------+
| name | size |
|------------------------------+------+
| test_stage/data_comma.csv.gz | 128 |
| test_stage/date_data.csv.gz | 128 |
+------------------------------+------+
Python code to create dynamic tables and insert data from stage
import snowflake.connector
# Below setting up connection properties
# Providing all values for database, schema
# therefore prefixing those values later in code
# is not needed - can be modified as needed.
con = snowflake.connector.connect(
user='user',
password='password',
account='ab313.us-east-2.aws',
warehouse='COMPUTE_WH',
database='TEST_DB',
schema='PUBLIC'
)
try:
cur = con.cursor()
# Query stage and get file-names
# Split file name to extract part NOT containing extensions
cur.execute("select DISTINCT split_part(metadata$filename,'.',1) as col1, metadata$filename as col2 from @test_stage;")
for (col1,col2) in cur:
# Dynamic table creation is based on files in stage.
# Here stage files are with skip-header, so column name are
# hard-coded.
# Even if we try and get column-names from header row
# (if included) in stage file, data-types still needs
# hard-coding or perhaps fetching from other table
# if needed complete dynamism.
cur.execute("create transient table "+col1+"(id number);")
# Copy data into table created above from stage file
cur.execute("copy into "+col1+" from (select $1 from @test_stage/"+col2+");")
finally:
con.close()
Upvotes: 0