EliSquared
EliSquared

Reputation: 1559

Snowflake Using Streams to Track Updates/Deletes to a Table

I am having trouble understanding how Streams work in terms of tracking changes. I would like to create a history table that tracks every UPDATE and DELETE to a table, but I am finding I do not understand how this works.

If I have table Table1 with a Stream:

 CREATE TABLE Table1
 (
   XID INT IDENTITY PRIMARY KEY,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
 );

 CREATE STREAM Table1_History ON TABLE Table1;

If I insert data:

INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;

Then run:

SELECT * FROM Table1_History;

It returns the following:

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   101 String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

So far so good.

But if I run:

UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

Then select from Table1_History, I get:

SELECT * FROM Table1_History;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

The METADATA$ACTION is still INSERT, and the FIELD1 value is now stored in the stream as 1001. There is no longer any record I can see that the row used to have a value of 101 and that it was updated.

If I run the following:

DELETE FROM Table1 WHERE XID = 2;

The stream now returns:

SELECT * FROM Table1_History;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b

There is now 0 records I can see on the stream of the second row ever being in the database.

I dont get the point of the Stream table for tracking UPDATES/DELETES. Is this not the use of streams?

I tried following this: Snowflake Streams Made Simple, but I still dont understand.

Upvotes: 4

Views: 7692

Answers (3)

Benny
Benny

Reputation: 1

Streams are not tables.

A stream only measures the delta since you last consumed (or initially defined) the stream. What you are currently seeing is you just keep overwriting data before you take another snapshot via the stream. The initial insert values are what has changed since last time you captured the data. By writing off the insert values to another table, a new snapshot is created from which to measure change from.

I suggest you create a table: create table Table_history (...)

do your initial inserts to table1 and look at the stream: (select * from Table1_History);

Now insert to your history table from the STREAM you defined:

insert into table_history (...) select * from Table1_History ;

Check the stream again: select * from Table1_History The data should be gone. Nothing.

Then do the update: UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

Check the stream again: select * from Table1_History

Upvotes: 0

NickW
NickW

Reputation: 9768

To quote the Snowflake documentation: "A stream stores the current transactional version of a table and is the appropriate source of CDC records in most scenarios."

Have a look at this example in the Snowflake documentation: https://docs.snowflake.com/en/user-guide/streams.html#example-1

My understanding is that a stream will only hold the current version of a record until you advance the offset. So if you insert a record and then update it, before advancing the offset, then it will show a single insert but the fields will hold the latest values.

If you then advance the offset and update or delete the record then those events will show in the stream - though if you updated and then deleted the same record (before advancing the offset) the stream would just show the delete, as that's the last position for that record.

UPDATE 1 It sounds like you are trying to implement audit tracking for every change made to a record in a table - this is not what Streams are designed to do and I don't think you would be able to implement a solution, using Streams, that guaranteed to log every change.

If you read the Streams documentation it states "The stream can provide the set of changes from the current offset to the current transactional time of the source table (i.e. the current version of the table). The stream maintains only the delta of the changes; if multiple DML statements change a row, the stream contains only the latest action taken on that row."

CDC is a terminology specifically related to loading data warehouses and is never meant as a generic term for capturing every change made to a record.

If you want to create a genuine auditing capability in Snowflake then I'm afraid I don't know if that is possible. The time travel feature shows that Snowflake retains all the changes made to a record (within the retention period) but I'm not aware of any way of accessing just these changes; I think you can only access the history of a record at points in time and you have no way of knowing at what times any changes were made

UPDATE 2 Just realised that Snowflake allows Change Tracking on a table without necessarily using Streams. This is probably a better solution if you want to capture all changes to a table, not just the latest version. The functionality is documented here: https://docs.snowflake.com/en/sql-reference/constructs/changes.html

Upvotes: 4

EliSquared
EliSquared

Reputation: 1559

Alright so as @NickW stated the streams table is more about tracking changes between offsets. This means I can still do what I want to do, but it will require an explicit INSERT into a History table between DML operations.

First create the main Table, the stream, and the History table:

CREATE TABLE Table1
(
   XID INT IDENTITY PRIMARY KEY,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);

CREATE STREAM Table1_Stream ON TABLE Table1;

CREATE TABLE Table1_History
(
   UID INT IDENTITY PRIMARY KEY,
   XID INT,
   FIELD1 INT,
   FIELD2 STRING,
   DATECREATED TIMESTAMP,
   METADATA$ACTION STRING,   --METADATA Column from Stream
   METADATA$ISUPDATE STRING, --METADATA Column from Stream
   DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);

Then INSERT records:

INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;

The Stream Table now reads:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   101    String1 2020-08-13 06:52:34.402 INSERT  FALSE   23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2   102 String2 2020-08-13 06:52:34.402 INSERT  FALSE   5b5e429cf3a174303b2f2192b5d602ed9dedd865

Then do an INSERT from the Streams table to the History table:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

Note the WHERE clause which eliminates INSERT only records unless it is part of an UPDATE, which DELETEs then INSERTs a record.

Now, even though the stream records were not actually inserted into the history table because of the WHERE clause, if you query the Stream you get NULL:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

Now if you do an UPDATE the stream will show it:

UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    93256f240f338581cc4781c2e79a28075e1b66d7
1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    93256f240f338581cc4781c2e79a28075e1b66d7

Now run the insert:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

SELECT * FROM Table1_History;

UID XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   DATEINSERTED
1   1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    2020-08-14 09:13:41.474
2   1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    2020-08-14 09:13:41.474

And the Stream is NULL again:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

If you run a DELETE, the change is again reflected in the Stream:

DELETE FROM Table1 WHERE XID = 2

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID
2   102 String2 2020-08-14 09:11:20.173 DELETE  FALSE   51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3

And it can be INSERT into the History table:

INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
                           METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
       METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'

SELECT * FROM Table1_History;

UID XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   DATEINSERTED
1   1   1001    String1 2020-08-14 09:11:20.173 INSERT  TRUE    2020-08-14 09:13:41.474
2   1   101 String1 2020-08-14 09:11:20.173 DELETE  TRUE    2020-08-14 09:13:41.474
4   2   102 String2 2020-08-14 09:11:20.173 DELETE  FALSE   2020-08-14 09:17:37.694

And the Stream is NULL again:

SELECT * FROM Table1_Stream;

XID FIELD1  FIELD2  DATECREATED METADATA$ACTION METADATA$ISUPDATE   METADATA$ROW_ID

The one thing I don't understand is why the UID on Table1_History incremented to 4 instead of 3, but that is a trivial problem.

This is how I will track all historical changes in Snowflake.

Upvotes: 0

Related Questions