Reputation: 11
currently am sourcing data from oracle
As a part of intial load ingested all history data from oracle table oracle_a to snowflake table "snow_a" using named stage and copy into commands.
I would like to perform SCD2 on snow_a table based upon oracle_a table.
I mean if any new record added to Oracle_a table then that record to be inserted and any changes to existing record of oracle_a table ,
existing record of snow_a table to be expired and insert the record. Further details refer below image.
oracle_a table has key columns key_col1,key_col2,key_col3 as mentioned in below image. attr1 and attr2 are other attributes of the table enter image description here
Upvotes: 1
Views: 470
Reputation: 241
Here's a solution based on the following assumptions:
First create the tables and add the first set of delta data:
CREATE OR REPLACE TABLE stg.cdc2_oracle_d (
key1 varchar(10),
key2 varchar(10),
key3 varchar(10),
attr1 varchar(8),
attr2 varchar(8));
CREATE OR REPLACE TABLE edw.cdc2_snowflake_d (
key1 varchar(10),
key2 varchar(10),
key3 varchar(10),
attr1 varchar(8),
attr2 varchar(8),
eff_ts TIMESTAMP_LTZ(0),
exp_ts TIMESTAMP_LTZ(0),
active_fl char(1));
INSERT INTO stg.cdc2_oracle_d VALUES
( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.0'),
( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'),
( 'PT_3', 'DL_3', 'RPT_3', 'Addr3a', 'APT_3.0');
Then run the following Transformation script:
BEGIN;
-- 1: insert new-new records from stg table that don't current exist in the edw table
INSERT INTO edw.cdc2_snowflake_d
SELECT
key1,
key2,
key3,
attr1,
attr2,
CURRENT_TIMESTAMP(0) AS eff_ts,
CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
'Y' AS active_fl
FROM stg.cdc2_oracle_d stg
WHERE NOT EXISTS (
SELECT 1
FROM edw.cdc2_snowflake_d edw
WHERE edw.key1 = stg.key1
AND edw.key2 = stg.key2
AND edw.key3 = stg.key3
AND edw.active_fl = 'Y');
-- 2: insert new version of record from stg table where key current does exist in edw table
-- but only add if the attr columns are different, otherwise it's the same record
INSERT INTO edw.cdc2_snowflake_d
SELECT
stg.key1,
stg.key2,
stg.key3,
stg.attr1,
stg.attr2,
CURRENT_TIMESTAMP(0) AS eff_ts,
CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
'T' AS active_fl -- set flat to Temporary setting
FROM stg.cdc2_oracle_d stg
JOIN edw.cdc2_snowflake_d edw ON edw.key1 = stg.key1 AND edw.key2 = stg.key2
AND edw.key3 = stg.key3 AND edw.active_fl = 'Y'
WHERE (stg.attr1 <> edw.attr1
OR stg.attr2 <> edw.attr2);
-- 3: deactive the current record where there is a new record from above step
-- and set the end_ts to 1 second prior to new record so there is no overlap in data
UPDATE edw.cdc2_snowflake_d old
SET old.active_fl = 'N',
old.exp_ts = DATEADD(SECOND, -1, new.eff_ts)
FROM edw.cdc2_snowflake_d new
WHERE old.key1 = new.key1
AND old.key2 = new.key2
AND old.key3 = new.key3
AND new.active_fl = 'T'
AND old.active_fl = 'Y';
-- 4: finally set all the temporary records to active
UPDATE cdc2_snowflake_d tmp
SET tmp.active_fl = 'Y'
WHERE tmp.active_fl = 'T';
COMMIT;
Review the results, then truncate & add new data and run the script again:
SELECT * FROM stg.cdc2_oracle_d;
SELECT * FROM edw.cdc2_snowflake_d ORDER BY 1,2,3,5;
TRUNCATE TABLE stg.cdc2_oracle_d;
INSERT INTO stg.cdc2_oracle_d VALUES
( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.1'), -- record has updated attr2
( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'), -- record has no changes
( 'PT_4', 'DL_4', 'RPT_4', 'Addr4a', 'APT_4.0'); -- new record
You'll see that PT_1 has 2 records w/ non-overlapping timestamps, only 1 is active.
Upvotes: 0
Reputation: 11
I did following steps to perform SCD2.
Upvotes: 0
Reputation: 31
Implementing SCD Type 2 functionality on a table in Snowflake is no different than in any other relational database. However, there is additional functionality that can help with this process. Please have a look at this blog post series on using Snowflake Streams and Tasks to perform the SCD logic. https://www.snowflake.com/blog/building-a-type-2-slowly-changing-dimension-in-snowflake-using-streams-and-tasks-part-1/
Cheers, Michael Rainey
Upvotes: 2
Reputation: 1
From the question, it seems that the incoming Oracle rows do not contain any SCD2 type columns and that when each row inserted into snowflake is to be inserted using SCD2 type functionality.
SCD2 columns can have a specific meaning to the business, such that 'exp_ts' could be actual date or a business date. Snowflake 'Stage' does not include SCD2 functionality. This is usually the role of an ETL framework, not that of a 'fast/bulk' load utility.
Most ETL vendors have SCD2 functions as a part of their offering.
Upvotes: 0
Reputation: 541
Ok so here is what I found - though you may need to adjust were the update and insert come from - since oracle_a is not in Snowflake.
CREATE TABLE snowflake_a(key_col1 varchar(10), key_col2 varchar(10), key_col3 varchar(10), attr1 varchar(8), attr2 varchar(10), eff_ts TIMESTAMP, exp_ts TIMESTAMP, valid varchar(10));
DROP table oracle_a;
INSERT INTO snowflake_a VALUES('PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', current_date, current_date, 'Active');
CREATE TABLE oracle_a(key_col1 varchar(10), key_col2 varchar(10), key_col3 varchar(10), attr1 varchar(8), attr2 varchar(8), eff_ts TIMESTAMP, exp_ts TIMESTAMP);
INSERT INTO oracle_a
VALUES( 'PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', '10/24/2019', '12/31/1999');
UPDATE snowflake_a
SET valid = 'Expired'
WHERE valid LIKE '%Active%';
SELECT * FROM snowflake_a;
INSERT INTO snowflake_a VALUES( 'PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', '10/24/2019', '12/31/1999', 'Active');
SELECT * FROM snowflake_a;
Or better yet, what are us using to connect from your Oracle ecosystem to the Snowflake ecosystem?
Upvotes: 1