Reputation: 193
I have the following incremental transactional data coming from S3 AVRO format.
{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-18 00:00:00:00",
"op_ts": "2018-05-18 00:00:00:00",
"op_type": "I",
"pos": "00000000001123",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
},
{
"after": {
"COM_PCT": null,
"DEPT_ID": 11,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-19 00:00:00:00",
"op_ts": "2018-05-19 00:00:00:00",
"op_type": "U",
"pos": "00000000001124",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
},
{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-20 00:00:00:00",
"op_ts": "2018-05-20 00:00:00:00",
"op_type": "U",
"pos": "00000000001125",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}
The first transaction is an insert transaction for the same primary key, second two are Update transactions, I cannot use stream pipe to handle incremental updates, is there a way to convert this into structured table and show only the latest insert/update transaction for that primary key ?
Upvotes: 1
Views: 192
Reputation: 2069
I assumed that the file contains changes only for one table, if not then you need to filter the changes only for a specific table. I also assumed that the data is in the variant type in the table, but this has no effect on the solution how you extract the data.
I suggest this solution:
Sample data:
CREATE OR REPLACE TABLE SAMPLE_RAW (samples variant);
INSERT INTO SAMPLE_RAW
SELECT parse_json('{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-18 00:00:00:00",
"op_ts": "2018-05-18 00:00:00:00",
"op_type": "I",
"pos": "00000000001123",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}')
UNION ALL
SELECT parse_json('{
"after": {
"COM_PCT": null,
"DEPT_ID": 11,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-19 00:00:00:00",
"op_ts": "2018-05-19 00:00:00:00",
"op_type": "U",
"pos": "00000000001124",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}')
UNION ALL
SELECT parse_json('{
"after": {
"COM_PCT": null,
"DEPT_ID": 30,
"EMAIL": "AKHOO",
"EMPLOYEE_ID": 115,
"FIRST_NAME": "ALEX",
"LAST_NAME": "TIM",
"HIRE": "1995-05-18 00:00:00",
"MANAGER_ID": 114
},
"before": {},
"current_ts": "2018-05-20 00:00:00:00",
"op_ts": "2018-05-20 00:00:00:00",
"op_type": "U",
"pos": "00000000001125",
"primary_keys": ["EMPLOYEE_ID"],
"table": "HR.EMPLOYEE"
}');
Solution:
WITH src AS (
SELECT TO_TIMESTAMP(s.samples:op_ts::string, 'YYYY-MM-DD HH24:MI:SS:FF') AS op_ts
, s.samples:op_type::string AS op_type
, s.samples:after:COM_PCT::string AS COM_PCT
, s.samples:after:DEPT_ID As DEPT_ID
, s.samples:after:EMAIL::string As EMAIL
, s.samples:after:EMPLOYEE_ID As EMPLOYEE_ID
, s.samples:after:FIRST_NAME::string As FIRST_NAME
, s.samples:after:LAST_NAME::string AS LAST_NAME
, TO_TIMESTAMP(s.samples:after:HIRE::string, 'YYYY-MM-DD HH24:MI:SS') As HIRE
, s.samples:after:MANAGER_ID AS MANAGER_ID
FROM SAMPLE_RAW AS s
QUALIFY ROW_NUMBER() OVER(PARTITION BY EMPLOYEE_ID ORDER BY op_ts DESC) = 1
)
MERGE INTO HR.EMPLOYEE AS trg USING src ON trg.EMPLOYEE_ID = src.EMPLOYEE_ID
WHEN MATCHED AND src.op_type = 'U' THEN UPDATE SET trg.EMAIL = src.EMAIL ...
WHEN MATCHED AND src.op_type = 'D' THEN DELETE
WHEN NOT MATCHED THEN INSERT (EMAIL, FIRST_NAME, LAST_NAME, ...) VALUES (src.EMAIL, src.FIRST_NAME, src.LAST_NAME, ...)
Upvotes: 1