Marco Roy
Marco Roy

Reputation: 5243

Replaying outstanding snowpipe notifications/messages in Snowflake

When a pipe is re-created, there is a chance of missing some notifications. Is there any way to replay these missed notifications? Refreshing the pipe is dangerous (so not an option), as the load history is lost when the pipe is re-created (and hence could result in ingesting the same files twice & creating duplicate records)

Snowflake has documented a process on how to re-create pipes with automated data loading (link). Unfortunately, any new notifications coming in between step 1 (pause the pipe) and step 3 (re-create the pipe) can be missed. Even by automating the process with a procedure, we can shrink the window, but not eliminate it. I have confirmed this with multiple tests. Even without pausing the previous pipe, there's still a slim chance for this to happen.

However, Snowflake is aware of the notifications, as the notification queue is separate from the pipes (and shared for the entire account). But the notifications received at the "wrong" time are just never processed (which I guess makes sense if there's no active pipe to process them at the time).

I think we can see those notifications in the numOutstandingMessagesOnChannel property of the pipe status, but I can't find much more information about this, nor how to get those notifications processed. I think they might just become lost when the pipe is replaced. 😞

Note: This is related to another question I asked about preserving the load history when re-creating pipes in Snowflake (link).

Upvotes: 0

Views: 974

Answers (1)

Marco Roy
Marco Roy

Reputation: 5243

Assuming there's no way to replay outstanding notifications, I've instead created a procedure to detect files that have failed to load automatically. A benefit of this approach is that it can also detect any file that has failed to load for any reason (not only missed notifications).

The procedure can be called like this:

CALL verify_pipe_load(
  'my_db.my_schema.my_pipe', -- Pipe name
  'my_db.my_schema.my_stage', -- Stage name
  'my_db.my_schema.my_table', -- Table name
  '/YYYY/MM/DD/HH/', -- File prefix
  'YYYY-MM-DD', -- Start time for the loads
  'ERROR' -- Mode
);

Here's how it works, at a high level:

  • First, it finds all the files in the stage that match the specified prefix (using the LIST command), minus a slight delay to account for latency.
  • Then, out of those files, it finds all of those that have no records in COPY_HISTORY.
  • Finally, it handles those missing file loads in one of three ways, depending on the mode:
    • The 'ERROR' mode will abort the procedure by throwing an exception. This is useful to automate the continuous monitoring of pipes and ensure no files are missed. Just hook it up to your automation tool of choice! We use DBT + DBT Cloud.
    • The 'INGEST' mode will automatically re-queue the files for ingestion by Snowpipe using the REFRESH command for those specific files only.
    • The 'RETURN' mode will simply return the list of files in the response.

Here is the code for the procedure:

-- Returns a list of files missing from the destination table (separated by new lines).
-- Returns NULL if there are no missing files.
CREATE OR REPLACE PROCEDURE verify_pipe_load(
  -- The FQN of the pipe (used to auto ingest):
  PIPE_FQN STRING,
  -- Stage to get the files from (same as the pipe definition):
  STAGE_NAME STRING,
  -- Destination table FQN (same as the pipe definition):
  TABLE_FQN STRING,
  -- File prefix (to filter files):
  --  This should be based on a timestamp (ex: /YYYY/MM/DD/HH/)
  --  in order to restrict files to a specific time interval
  PREFIX STRING,
  -- The time to get the loaded files from (should match the prefix):
  START_TIME STRING,
  -- What to do with the missing files (if any):
  --  'RETURN': Return the list of missing files.
  --  'INGEST': Automatically ingest the missing files (and return the list).
  --  'ERROR': Make the procedure fail by throwing an exception.
  MODE STRING
)
  RETURNS STRING
  LANGUAGE JAVASCRIPT
  EXECUTE AS CALLER
AS
$$
  MODE = MODE.toUpperCase();
  if (!['RETURN', 'INGEST', 'ERROR'].includes(MODE)) {
    throw `Exception: Invalid mode '${MODE}'. Must be one of 'RETURN', 'INGEST' or 'ERROR'`;
  }

  let tableDB = TABLE_FQN.split('.')[0];
  let [pipeDB, pipeSchema, pipeName] = PIPE_FQN.split('.')
    .map(name => name.startsWith('"') && name.endsWith('"')
      ? name.slice(1, -1)
      : name.toUpperCase()
    );

  let listQueryId = snowflake.execute({sqlText: `
    LIST @${STAGE_NAME}${PREFIX};
  `}).getQueryId();

  let missingFiles = snowflake.execute({sqlText: `
    WITH staged_files AS (
      SELECT
        "name" AS name,
        TO_TIMESTAMP_NTZ(
          "last_modified",
          'DY, DD MON YYYY HH24:MI:SS GMT'
        ) AS last_modified,
        -- Add a minute per GB, to account for larger file size = longer ingest time
        ROUND("size" / 1024 / 1024 / 1024) AS ingest_delay,
        -- Estimate the time by which the ingest should be done (default 5 minutes)
        DATEADD(minute, 5 + ingest_delay, last_modified) AS ingest_done_ts
      FROM TABLE(RESULT_SCAN('${listQueryId}'))
      -- Ignore files that may not be done being ingested yet
      WHERE ingest_done_ts < CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())::TIMESTAMP_NTZ
    ), loaded_files AS (
      SELECT stage_location || file_name AS name
      FROM TABLE(
        ${tableDB}.information_schema.copy_history(
          table_name => '${TABLE_FQN}',
          start_time => '${START_TIME}'::TIMESTAMP_LTZ
        )
      )
      WHERE pipe_catalog_name = '${pipeDB}'
        AND pipe_schema_name = '${pipeSchema}'
        AND pipe_name = '${pipeName}'
    ), stage AS (
      SELECT DISTINCT stage_location
      FROM TABLE(
        ${tableDB}.information_schema.copy_history(
          table_name => '${TABLE_FQN}',
          start_time => '${START_TIME}'::TIMESTAMP_LTZ
        )
      )
      WHERE pipe_catalog_name = '${pipeDB}'
        AND pipe_schema_name = '${pipeSchema}'
        AND pipe_name = '${pipeName}'
    ), missing_files AS (
      SELECT REPLACE(name, stage_location) AS prefix
      FROM staged_files
      CROSS JOIN stage
      WHERE name NOT IN (
        SELECT name FROM loaded_files
      )
    )
    SELECT LISTAGG(prefix, '\n') AS "missing_files"
    FROM missing_files;
  `});

  if (!missingFiles.next()) return null;
  missingFiles = missingFiles.getColumnValue('missing_files');
  if (missingFiles.length == 0) return null;

  if (MODE == 'ERROR') {
    throw `Exception: Found missing files:\n'${missingFiles}'`;
  }

  if (MODE == 'INGEST') {
    missingFiles
      .split('\n')
      .forEach(file => snowflake.execute({sqlText: `
        ALTER PIPE ${PIPE_FQN} REFRESH prefix='${file}';
      `}));
  }

  return missingFiles;
$$
;

Upvotes: 1

Related Questions