RaoSK
RaoSK

Reputation: 11

Perform SCD2 on snowflake table based upon oracle input data

currently am sourcing data from oracle

As a part of intial load ingested all history data from oracle table oracle_a to snowflake table "snow_a" using named stage and copy into commands.

I would like to perform SCD2 on snow_a table based upon oracle_a table.

I mean if any new record added to Oracle_a table then that record to be inserted and any changes to existing record of oracle_a table ,

existing record of snow_a table to be expired and insert the record. Further details refer below image.

oracle_a table has key columns key_col1,key_col2,key_col3 as mentioned in below image. attr1 and attr2 are other attributes of the table enter image description here

Upvotes: 1

Views: 470

Answers (5)

Erick Roesch
Erick Roesch

Reputation: 241

Here's a solution based on the following assumptions:

  1. The source oracle table is not itself responsible for SCD2 processing (so Eff/Exp TS columns wouldn't be present on that table).
  2. There is an external process that is only Extracting/Loading delta (new, updated) records into Snowflake.
  3. The source oracle is not deleting records

First create the tables and add the first set of delta data:

CREATE OR REPLACE TABLE stg.cdc2_oracle_d (
  key1 varchar(10),
  key2 varchar(10),
  key3 varchar(10),
  attr1 varchar(8),
  attr2 varchar(8));

CREATE OR REPLACE TABLE edw.cdc2_snowflake_d (
  key1 varchar(10),
  key2 varchar(10),
  key3 varchar(10),
  attr1 varchar(8),
  attr2 varchar(8),
  eff_ts TIMESTAMP_LTZ(0),
  exp_ts TIMESTAMP_LTZ(0),
  active_fl char(1));

INSERT INTO stg.cdc2_oracle_d VALUES
 ( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.0'),
 ( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'),
 ( 'PT_3', 'DL_3', 'RPT_3', 'Addr3a', 'APT_3.0');

Then run the following Transformation script:

BEGIN;
-- 1: insert new-new records from stg table that don't current exist in the edw table
INSERT INTO edw.cdc2_snowflake_d
SELECT
  key1,
  key2,
  key3,
  attr1,
  attr2,
  CURRENT_TIMESTAMP(0) AS eff_ts,
  CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
  'Y' AS active_fl
FROM stg.cdc2_oracle_d stg
WHERE NOT EXISTS (
  SELECT 1
    FROM edw.cdc2_snowflake_d edw
   WHERE edw.key1 = stg.key1
     AND edw.key2 = stg.key2
     AND edw.key3 = stg.key3
     AND edw.active_fl = 'Y');

-- 2: insert new version of record from stg table where key current does exist in edw table 
      -- but only add if the attr columns are different, otherwise it's the same record
INSERT INTO edw.cdc2_snowflake_d
SELECT
  stg.key1,
  stg.key2,
  stg.key3,
  stg.attr1,
  stg.attr2,
  CURRENT_TIMESTAMP(0) AS eff_ts,
  CAST('9999-12-31 23:59:59' AS TIMESTAMP) AS end_ts,
  'T' AS active_fl  -- set flat to Temporary setting
FROM stg.cdc2_oracle_d stg
JOIN edw.cdc2_snowflake_d edw ON edw.key1 = stg.key1 AND edw.key2 = stg.key2 
                             AND edw.key3 = stg.key3 AND edw.active_fl = 'Y'
WHERE (stg.attr1 <> edw.attr1
   OR  stg.attr2 <> edw.attr2);

-- 3: deactive the current record where there is a new record from above step
      -- and set the end_ts to 1 second prior to new record so there is no overlap in data
UPDATE edw.cdc2_snowflake_d old
   SET old.active_fl = 'N',
       old.exp_ts = DATEADD(SECOND, -1, new.eff_ts)
  FROM edw.cdc2_snowflake_d new
 WHERE old.key1 = new.key1
   AND old.key2 = new.key2
   AND old.key3 = new.key3
   AND new.active_fl = 'T'
   AND old.active_fl = 'Y';

-- 4: finally set all the temporary records to active
UPDATE cdc2_snowflake_d tmp
   SET tmp.active_fl = 'Y'
 WHERE tmp.active_fl = 'T';

COMMIT;

Review the results, then truncate & add new data and run the script again:

SELECT * FROM stg.cdc2_oracle_d;
SELECT * FROM edw.cdc2_snowflake_d ORDER BY 1,2,3,5;

TRUNCATE TABLE stg.cdc2_oracle_d; 

INSERT INTO stg.cdc2_oracle_d VALUES
 ( 'PT_1', 'DL_1', 'RPT_1', 'Addr1a', 'APT_1.1'),  -- record has updated attr2
 ( 'PT_2', 'DL_2', 'RPT_2', 'Addr2a', 'APT_2.0'),  -- record has no changes
 ( 'PT_4', 'DL_4', 'RPT_4', 'Addr4a', 'APT_4.0');  -- new record

You'll see that PT_1 has 2 records w/ non-overlapping timestamps, only 1 is active.

Upvotes: 0

RaoSK
RaoSK

Reputation: 11

I did following steps to perform SCD2.

  1. Loaded Oracle_a table data into TEMPORARY scd2_temp table
  2. Performed update on snow_a to expire "changed records" by joining key cols and checking the rest of attributes
  3. Inserted into snow_a table from TEMPORARY scd2_temp to snow_a table

Upvotes: 0

mRainey
mRainey

Reputation: 31

Implementing SCD Type 2 functionality on a table in Snowflake is no different than in any other relational database. However, there is additional functionality that can help with this process. Please have a look at this blog post series on using Snowflake Streams and Tasks to perform the SCD logic. https://www.snowflake.com/blog/building-a-type-2-slowly-changing-dimension-in-snowflake-using-streams-and-tasks-part-1/

Cheers, Michael Rainey

Upvotes: 2

Ian Fickling
Ian Fickling

Reputation: 1

From the question, it seems that the incoming Oracle rows do not contain any SCD2 type columns and that when each row inserted into snowflake is to be inserted using SCD2 type functionality.

SCD2 columns can have a specific meaning to the business, such that 'exp_ts' could be actual date or a business date. Snowflake 'Stage' does not include SCD2 functionality. This is usually the role of an ETL framework, not that of a 'fast/bulk' load utility.

Most ETL vendors have SCD2 functions as a part of their offering.

Upvotes: 0

Rachel McGuigan
Rachel McGuigan

Reputation: 541

Ok so here is what I found - though you may need to adjust were the update and insert come from - since oracle_a is not in Snowflake.

CREATE TABLE snowflake_a(key_col1 varchar(10), key_col2 varchar(10), key_col3 varchar(10), attr1 varchar(8), attr2 varchar(10), eff_ts TIMESTAMP, exp_ts TIMESTAMP, valid varchar(10)); 

DROP table oracle_a;
INSERT INTO snowflake_a VALUES('PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', current_date, current_date, 'Active');


CREATE TABLE oracle_a(key_col1 varchar(10), key_col2 varchar(10), key_col3 varchar(10), attr1 varchar(8), attr2 varchar(8), eff_ts TIMESTAMP, exp_ts TIMESTAMP); 

INSERT INTO oracle_a
VALUES( 'PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', '10/24/2019', '12/31/1999');


UPDATE snowflake_a
   SET valid = 'Expired'
WHERE valid LIKE '%Active%';

SELECT * FROM snowflake_a;

INSERT INTO snowflake_a VALUES( 'PT_1', 'DL_1', 'RPT_1', 'Address1', 'APT_1', '10/24/2019', '12/31/1999', 'Active');

SELECT * FROM snowflake_a;

enter image description here

Or better yet, what are us using to connect from your Oracle ecosystem to the Snowflake ecosystem?

Upvotes: 1

Related Questions