Asher
Asher

Reputation: 193

how to transform incremental update data into structured table in snowflake

I have the following incremental transactional data coming from S3 AVRO format.

{
  "after": {
    "COM_PCT": null,
    "DEPT_ID": 30,
    "EMAIL": "AKHOO",
    "EMPLOYEE_ID": 115,
    "FIRST_NAME": "ALEX",
    "LAST_NAME": "TIM",
    "HIRE": "1995-05-18 00:00:00",
    "MANAGER_ID": 114
  },
  "before": {},
  "current_ts": "2018-05-18 00:00:00:00",
  "op_ts": "2018-05-18 00:00:00:00",
  "op_type": "I",
  "pos": "00000000001123",
  "primary_keys": ["EMPLOYEE_ID"],
  "table": "HR.EMPLOYEE"
},
{
    "after": {
      "COM_PCT": null,
      "DEPT_ID": 11,
      "EMAIL": "AKHOO",
      "EMPLOYEE_ID": 115,
      "FIRST_NAME": "ALEX",
      "LAST_NAME": "TIM",
      "HIRE": "1995-05-18 00:00:00",
      "MANAGER_ID": 114
    },
    "before": {},
    "current_ts": "2018-05-19 00:00:00:00",
    "op_ts": "2018-05-19 00:00:00:00",
    "op_type": "U",
    "pos": "00000000001124",
    "primary_keys": ["EMPLOYEE_ID"],
    "table": "HR.EMPLOYEE"
  },
  {
      "after": {
        "COM_PCT": null,
        "DEPT_ID": 30,
        "EMAIL": "AKHOO",
        "EMPLOYEE_ID": 115,
        "FIRST_NAME": "ALEX",
        "LAST_NAME": "TIM",
        "HIRE": "1995-05-18 00:00:00",
        "MANAGER_ID": 114
      },
      "before": {},
      "current_ts": "2018-05-20 00:00:00:00",
      "op_ts": "2018-05-20 00:00:00:00",
      "op_type": "U",
      "pos": "00000000001125",
      "primary_keys": ["EMPLOYEE_ID"],
      "table": "HR.EMPLOYEE"
    }

The first transaction is an insert transaction for the same primary key, second two are Update transactions, I cannot use stream pipe to handle incremental updates, is there a way to convert this into structured table and show only the latest insert/update transaction for that primary key ?

Upvotes: 1

Views: 192

Answers (1)

Michael Golos
Michael Golos

Reputation: 2069

I assumed that the file contains changes only for one table, if not then you need to filter the changes only for a specific table. I also assumed that the data is in the variant type in the table, but this has no effect on the solution how you extract the data.

I suggest this solution:

  • At the very beginning, you should use the QUALIFY function and filter the data down to the last version of the record only.
  • Then you can perform a MERGE operation to insert or update the record.
  • If your data also allows DELETE operations, then they should be included in the code.

Sample data:

CREATE OR REPLACE TABLE SAMPLE_RAW (samples variant);                  

INSERT INTO SAMPLE_RAW
SELECT parse_json('{
    "after": {
      "COM_PCT": null,
      "DEPT_ID": 30,
      "EMAIL": "AKHOO",
      "EMPLOYEE_ID": 115,
      "FIRST_NAME": "ALEX",
      "LAST_NAME": "TIM",
      "HIRE": "1995-05-18 00:00:00",
      "MANAGER_ID": 114
    },
    "before": {},
    "current_ts": "2018-05-18 00:00:00:00",
    "op_ts": "2018-05-18 00:00:00:00",
    "op_type": "I",
    "pos": "00000000001123",
    "primary_keys": ["EMPLOYEE_ID"],
    "table": "HR.EMPLOYEE"
  }')
UNION ALL
SELECT parse_json('{
      "after": {
        "COM_PCT": null,
        "DEPT_ID": 11,
        "EMAIL": "AKHOO",
        "EMPLOYEE_ID": 115,
        "FIRST_NAME": "ALEX",
        "LAST_NAME": "TIM",
        "HIRE": "1995-05-18 00:00:00",
        "MANAGER_ID": 114
      },
      "before": {},
      "current_ts": "2018-05-19 00:00:00:00",
      "op_ts": "2018-05-19 00:00:00:00",
      "op_type": "U",
      "pos": "00000000001124",
      "primary_keys": ["EMPLOYEE_ID"],
      "table": "HR.EMPLOYEE"
    }')
UNION ALL                
SELECT parse_json('{
        "after": {
          "COM_PCT": null,
          "DEPT_ID": 30,
          "EMAIL": "AKHOO",
          "EMPLOYEE_ID": 115,
          "FIRST_NAME": "ALEX",
          "LAST_NAME": "TIM",
          "HIRE": "1995-05-18 00:00:00",
          "MANAGER_ID": 114
        },
        "before": {},
        "current_ts": "2018-05-20 00:00:00:00",
        "op_ts": "2018-05-20 00:00:00:00",
        "op_type": "U",
        "pos": "00000000001125",
        "primary_keys": ["EMPLOYEE_ID"],
        "table": "HR.EMPLOYEE"
      }');

Solution:

WITH src AS (
  SELECT TO_TIMESTAMP(s.samples:op_ts::string, 'YYYY-MM-DD HH24:MI:SS:FF') AS op_ts
       , s.samples:op_type::string AS op_type
       , s.samples:after:COM_PCT::string AS COM_PCT
       , s.samples:after:DEPT_ID As DEPT_ID
       , s.samples:after:EMAIL::string As EMAIL
       , s.samples:after:EMPLOYEE_ID As EMPLOYEE_ID
       , s.samples:after:FIRST_NAME::string As FIRST_NAME
       , s.samples:after:LAST_NAME::string AS LAST_NAME
       , TO_TIMESTAMP(s.samples:after:HIRE::string, 'YYYY-MM-DD HH24:MI:SS') As HIRE
       , s.samples:after:MANAGER_ID AS MANAGER_ID  
    FROM SAMPLE_RAW AS s
 QUALIFY ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY op_ts DESC) = 1
)  
MERGE INTO HR.EMPLOYEE AS trg USING src ON trg.EMPLOYEE_ID = src.EMPLOYEE_ID
WHEN MATCHED AND src.op_type = 'U' THEN UPDATE SET trg.EMAIL = src.EMAIL ...
WHEN MATCHED AND src.op_type = 'D' THEN DELETE
WHEN NOT MATCHED THEN INSERT (EMAIL, FIRST_NAME, LAST_NAME, ...) VALUES (src.EMAIL, src.FIRST_NAME, src.LAST_NAME, ...)

Reference: QUALIFY, MERGE

Upvotes: 1

Related Questions