maverickz
maverickz

Reputation: 193

Snowpipe: Load base64 encoded JSON to table

We have a JSON file in s3, with base64 encoded JSON events. The decoded events will be of format

 {
    "payload": <bytes>,
    "context": {
        "metadata1": "metadata1_val",
        "metadata2": "metadata2_val"
     }
 }

where payload is base64 encoded JSON as well. Decoded payload will be of format

{
    "event_key1": "event_val1",
    "event_key2": "event_val2",
    "event_key3": "event_val3",
 }

Since Snowflake stage file formats are restricted to CSV, JSON, AVRO, ORC, PARQUET, XML, without any direct support for binary format, we created a custom JSON file format as follows

create or replace file format BINARY_JSON_FORMAT
  type = 'JSON'
  BINARY_FORMAT = 'BASE64'
  strip_outer_array = true;

Couldn't get this to load binary data into a single column to be decoded accordingly.

CREATE OR REPLACE STAGE TEST_STAGE
 storage_integration = <storage_integration>
 url='s3://bukit/test/'
 file_format = BINARY_JSON_FORMAT;

// The following could not parse the file
select parse_json(base64_decode_string($1)):payload as payload from @TEST_STAGE/test-file;

Any thoughts on how this could be achieved? We want to wrap this select in a copy into statement to create a snowpipe to ingest data from s3.

COPY INTO TEST_TABLE (
  event_key1,
  event_key2,
  event_key3
) from (
  select 
    parse_json(base64_decode_string($1)):payload.event_key1 event_key1,
    parse_json(base64_decode_string($1)):payload.event_key2 event_key2,
    parse_json(base64_decode_string($1)):payload.event_key3 event_key3,
  from @TEST_STAGE
);

Upvotes: 2

Views: 465

Answers (1)

Simeon Pilgrim
Simeon Pilgrim

Reputation: 25978

Well years ago I did the following

-- SETUP
CREATE TABLE history_diags(
unit_id NUMBER,
time TIMESTAMP_NTZ,
data VARIANT);

CREATE TABLE raw_history_diags(
unit_id NUMBER,
time TIMESTAMP_NTZ,
data STRING);

CREATE STAGE point_history 
URL = 's3://bucket_name/' 
CREDENTIALS = ( creds stuff ) 
  COMMENT = 'S3 bucket for history data';


-- PROCESSING  
COPY INTO raw_history_diags
FROM '@point_history/' 
    pattern='.*diag_history_.*'
    file_format = 'POINT_RAW';


INSERT INTO HISTORY_DIAGS(unit_id, time, data)
  SELECT 
      unit_id,
      time, 
      iff(data is not null,unpack_base64_tids(data),'{}'::variant)
  FROM RAW_HISTORY_DIAGS;

DELETE FROM RAW_HISTORY_DIAGS;

where unpack_base64_tids was fundamentally base64_decode_string and thus you should be able to combine those via something like..

INSERT INTO TEST_TABLE (event_key1, event_key2, event_key3)
FROM (
    SELECT 
        json.payload.event_keys1 AS event_key1
        json.payload.event_keys2 AS event_key2
        json.payload.event_keys3 AS event_key3
    FROM (
        SELECT parse_json(base64_decode_string($1)) AS json
        FROM @TEST_STAGE/test-file
    )
)

Upvotes: 3

Related Questions