Reputation: 1559
I am having trouble understanding how Streams work in terms of tracking changes. I would like to create a history table that tracks every UPDATE
and DELETE
to a table, but I am finding I do not understand how this works.
If I have table Table1
with a Stream:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_History ON TABLE Table1;
If I insert data:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
Then run:
SELECT * FROM Table1_History;
It returns the following:
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
So far so good.
But if I run:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
Then select from Table1_History
, I get:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
The METADATA$ACTION
is still INSERT
, and the FIELD1
value is now stored in the stream as 1001
. There is no longer any record I can see that the row used to have a value of 101
and that it was updated.
If I run the following:
DELETE FROM Table1 WHERE XID = 2;
The stream now returns:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
There is now 0 records I can see on the stream of the second row ever being in the database.
I dont get the point of the Stream table for tracking UPDATES/DELETES
. Is this not the use of streams?
I tried following this: Snowflake Streams Made Simple, but I still dont understand.
Upvotes: 4
Views: 7692
Reputation: 1
Streams are not tables.
A stream only measures the delta since you last consumed (or initially defined) the stream. What you are currently seeing is you just keep overwriting data before you take another snapshot via the stream. The initial insert values are what has changed since last time you captured the data. By writing off the insert values to another table, a new snapshot is created from which to measure change from.
I suggest you create a table: create table Table_history (...)
do your initial inserts to table1 and look at the stream: (select * from Table1_History);
Now insert to your history table from the STREAM you defined:
insert into table_history (...) select * from Table1_History ;
Check the stream again: select * from Table1_History The data should be gone. Nothing.
Then do the update: UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
Check the stream again: select * from Table1_History
Upvotes: 0
Reputation: 9768
To quote the Snowflake documentation: "A stream stores the current transactional version of a table and is the appropriate source of CDC records in most scenarios."
Have a look at this example in the Snowflake documentation: https://docs.snowflake.com/en/user-guide/streams.html#example-1
My understanding is that a stream will only hold the current version of a record until you advance the offset. So if you insert a record and then update it, before advancing the offset, then it will show a single insert but the fields will hold the latest values.
If you then advance the offset and update or delete the record then those events will show in the stream - though if you updated and then deleted the same record (before advancing the offset) the stream would just show the delete, as that's the last position for that record.
UPDATE 1 It sounds like you are trying to implement audit tracking for every change made to a record in a table - this is not what Streams are designed to do and I don't think you would be able to implement a solution, using Streams, that guaranteed to log every change.
If you read the Streams documentation it states "The stream can provide the set of changes from the current offset to the current transactional time of the source table (i.e. the current version of the table). The stream maintains only the delta of the changes; if multiple DML statements change a row, the stream contains only the latest action taken on that row."
CDC is a terminology specifically related to loading data warehouses and is never meant as a generic term for capturing every change made to a record.
If you want to create a genuine auditing capability in Snowflake then I'm afraid I don't know if that is possible. The time travel feature shows that Snowflake retains all the changes made to a record (within the retention period) but I'm not aware of any way of accessing just these changes; I think you can only access the history of a record at points in time and you have no way of knowing at what times any changes were made
UPDATE 2 Just realised that Snowflake allows Change Tracking on a table without necessarily using Streams. This is probably a better solution if you want to capture all changes to a table, not just the latest version. The functionality is documented here: https://docs.snowflake.com/en/sql-reference/constructs/changes.html
Upvotes: 4
Reputation: 1559
Alright so as @NickW stated the streams table is more about tracking changes between offsets. This means I can still do what I want to do, but it will require an explicit INSERT
into a History table between DML
operations.
First create the main Table, the stream, and the History table:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_Stream ON TABLE Table1;
CREATE TABLE Table1_History
(
UID INT IDENTITY PRIMARY KEY,
XID INT,
FIELD1 INT,
FIELD2 STRING,
DATECREATED TIMESTAMP,
METADATA$ACTION STRING, --METADATA Column from Stream
METADATA$ISUPDATE STRING, --METADATA Column from Stream
DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
Then INSERT
records:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),
(102,'String2')
;
The Stream Table now reads:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
Then do an INSERT
from the Streams table to the History table:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
Note the WHERE
clause which eliminates INSERT
only records unless it is part of an UPDATE
, which DELETEs
then INSERTs
a record.
Now, even though the stream records were not actually inserted into the history table because of the WHERE
clause, if you query the Stream you get NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
Now if you do an UPDATE
the stream will show it:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
Now run the insert:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
And the Stream is NULL again:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
If you run a DELETE
, the change is again reflected in the Stream:
DELETE FROM Table1 WHERE XID = 2
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3
And it can be INSERT
into the History table:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,FIELD1,FIELD2,DATECREATED,
METADATA$ACTION,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
4 2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 2020-08-14 09:17:37.694
And the Stream is NULL again:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
The one thing I don't understand is why the UID
on Table1_History
incremented to 4
instead of 3
, but that is a trivial problem.
This is how I will track all historical changes in Snowflake.
Upvotes: 0