vvazza
vvazza

Reputation: 397

Copy latest file from S3 stage to Snowflake table using COPY command

I have an S3 stage partitioned by dates and I expect a file every hour everyday. I would want to pick only the latest file from the S3 stage using COPY command.

How can i specify the copy command to pick only the latest file? I read that Snowflake keeps history of Load Metadata for 64 days to avoid loading the same file. But I was wondering if there is any way to pick only the latest file through COPY command.

FILE_FORMAT=(type=csv 
            compression=gzip 
            field_delimiter=','  
            skip_header=1 
            field_optionally_enclosed_by='\"' 
            empty_field_as_null=true 
            NULL_IF = ('NULL','null','') 
            date_format='yyyy-mm-dd' time_format='hh24:mi:ss.ff' 
            )

Upvotes: 1

Views: 2720

Answers (1)

Greg Pavlik
Greg Pavlik

Reputation: 11046

I think you're going to need a stored procedure to do this. I lifted some code from a project I wrote to load files in order in bulk https://github.com/GregPavlik/snowflake_bulk_loader/blob/master/02.%20Bulk%20Load%20-%20Runtime.sql

The first challenge is that the LIST command for files returns a date format that is not the same as the standard timestamp literal formats. There's a UDF to convert from the last modified time to a Snowflake timestamp. The second challenge is that the LIST command returns the stage name in the path, but in slightly different ways depending on whether it's internal or external and if external depending on cloud host. There's another UDF to strip the stage name from the path.

From there, the SP lists the files, grabs the most recent one, and issues a copy command. You can follow what it's doing and change the appropriate points to your stage name, table name, and copy command.

This assumes you're using a named stage (external stages are named as are internal ones referenced with @). If not, you can adjust the code that prefixes the stage name with @.

create or replace function LAST_MODIFIED_TO_TIMESTAMP(LAST_MODIFIED string) 
returns timestamp_tz
as
$$
    to_timestamp_tz(left(LAST_MODIFIED, len(LAST_MODIFIED) - 4) || ' ' || '00:00', 'DY, DD MON YYYY HH:MI:SS TZH:TZM')
$$;


create or replace function STAGE_PATH_SHORTEN(FILE_PATH string)
returns string
language javascript
as
$$
    /*
        Removes the cloud provider prefix and stage name from the file path
    */
    var s3 = FILE_PATH.search(/s3:\/\//i);

    if ( s3 != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", s3 + 5) + 1);
    }

    var azure = FILE_PATH.search(/azure:\/\//i);

    if ( azure != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", azure + 8) + 1);
    }

    var newStyleInternal = FILE_PATH.search(/stages\//i);

    if (newStyleInternal != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal + 7) + 1);
    }

    var newStyleInternal = FILE_PATH.search(/stages[a-zA-Z0-9]{4,10}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{12}\//i);

    if (newStyleInternal != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", newStyleInternal) + 1);
    }

    var stageRegExp = "/";
    var re = new RegExp(stageRegExp, "i");

    var stageInStr = FILE_PATH.search(re);

    if (stageInStr != -1){
        return FILE_PATH.substring(FILE_PATH.indexOf("/", stageInStr) + 1);
    }

    throw "Unknown file path type."
$$;


create or replace procedure INGEST_MOST_RECENT_FILE(STAGE_NAME string, TARGET_TABLE string)
returns string
language javascript
execute as caller
as
$$

try{

    getResultSet(`list @${STAGE_NAME}`);
    var fileName = executeSingleValueQuery('FILE_NAME', 
                                           `select stage_path_shorten("name") as FILE_NAME, LAST_MODIFIED_TO_TIMESTAMP("last_modified") as LAST_MODIFIED_TS 
                                            from table(result_scan(last_query_id())) order by LAST_MODIFIED_TS desc limit 1;`);
    
    // Modify with your COPY INTO statement here, but leave the last part -- files=('${fileName}') -- unmodified.
    var copyStatus = executeSingleValueQuery("status",

`
copy into ${TARGET_TABLE} from @${STAGE_NAME} file_format=(type=CSV) files=('${fileName}') ;
`

    );
    
    return `Copy status: ${copyStatus}.`;

} catch (err) {
    return `Error: ${err.message}.`;
}

function getResultSet(sql){
    cmd = {sqlText: sql};
    stmt = snowflake.createStatement(cmd);
    var rs;
    rs = stmt.execute();
    return rs;
}

function executeSingleValueQuery(columnName, queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(columnName);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

$$;

call ingest_most_recent_file('TEST_STAGE', 'TARGET_TABLE');

Upvotes: 2

Related Questions