Aniket Chaudhuri
Aniket Chaudhuri

Reputation: 3

Snowflake SQL to Extract Data from snowflake.account_usage.copy_history view as Delta Load

I have a requirement where I'm trying to extract all the snowflake's copy history data from snowflake.account_usage.copy_history view as a delta load with a control table to store the current execution date and doing a delta based on last_load_time from the Snowflake view.

But the merge query is not supported with cte's. I can just select which I need to although. So only the select is working but I'm not able to update my control table with the latest timestamp

   WITH LastLoad AS (
    SELECT
        MAX(LAST_TIMESTAMP) AS last_load_time
    FROM
        edw_dev.meta.adf_load_control
    where
        source_entity = 'COPY_HISTORY'
),
CopyHistoryDelta AS (
    SELECT
        ch.*,
        CURRENT_TIMESTAMP AS updated_load_time
    FROM
        snowflake.account_usage.copy_history ch
        INNER JOIN LastLoad ll ON ch.LAST_LOAD_TIME > ll.last_load_time
) 
MERGE INTO edw_dev.meta.adf_load_control ut 
USING (
    SELECT
        updated_load_time
    FROM
        CopyHistoryDelta
) AS new_data 
ON ut.source_entity = 'COPY_HISTORY'
WHEN MATCHED 
    THEN
        UPDATE
        SET
            LAST_TIMESTAMP = new_data.updated_load_time
WHEN NOT MATCHED 
    THEN
        INSERT
            (last_load_time)
        VALUES
            (new_data.updated_load_time);

Sorry the formatting seems to be not working

Upvotes: 0

Views: 179

Answers (1)

ADITYA PAWAR
ADITYA PAWAR

Reputation: 695

In snowflake merge you have to use cte inside using clause itself like below,

MERGE INTO edw_dev.meta.adf_load_control ut 
USING (
    WITH LastLoad AS (
        SELECT
            MAX(LAST_TIMESTAMP) AS last_load_time
        FROM
            edw_dev.meta.adf_load_control
        where
            source_entity = 'COPY_HISTORY'
    ),
    CopyHistoryDelta AS (
        SELECT
            ch.*,
            CURRENT_TIMESTAMP AS updated_load_time
        FROM
            snowflake.account_usage.copy_history ch
            INNER JOIN LastLoad ll ON ch.LAST_LOAD_TIME > ll.last_load_time
    ) 
    SELECT
        updated_load_time
    FROM
        CopyHistoryDelta
) AS new_data 
ON ut.source_entity = 'COPY_HISTORY'
WHEN MATCHED 
    THEN
        UPDATE
        SET
            LAST_TIMESTAMP = new_data.updated_load_time
WHEN NOT MATCHED 
    THEN
        INSERT
            (last_load_time)
        VALUES
            (new_data.updated_load_time);

Upvotes: 0

Related Questions