Alex
Alex

Reputation: 39

"UPSERT" a Redshift table using Kinesis Firehose

I'm setting up a Kinesis Data Stream -> Firehose -> Redshift datapipeline.

Docs say that Firehose might introduce duplicates.

To solve this case it I'm trying to use an "upsert" approach from here.

Is it possible to set up a transaction invoke through the Firehose COPY command ?

What are the best practices ?

Thanks in advance !

Helpful code:

create table target 
(
    key BIGINT,
    value VARCHAR
);
{"key": 1, "value": "stub 1"}

begin transaction;
    
    -- create a stage table like a target table
    create temp table stage (like target);

    -- populate the stage table
    COPY stage FROM 's3://<bucket-name>/<manifests-folder>/' 
    CREDENTIALS 'aws_iam_role=arn:aws:iam::<iam>:role/<role>' 
    MANIFEST format as json 'auto';

    -- delete duplicates from the target table
    delete from target using stage 
    where target.key = stage.key; 

    -- populate the target table
    insert into target
    (select * from stage);

    -- drop the stage table
    drop table stage;

end transaction;

Upvotes: 0

Views: 1403

Answers (1)

Bill Weiner
Bill Weiner

Reputation: 11082

Yes you can set up a trigger on Firehose but I recommend that you don't. If you are using Firehose you have data coming in quickly and Firehose will COPY to Redshift as it needs. The issue that comes up is how fast do you really want tables changing in Redshift. Redshift is big but there are some practical limits to how fast you want to be adding incremental data to your fact tables. This rate will depend on what transforms are needed after ingestion, how often you can afford to vacuum / analyze, how heavy is your query load / complexity, etc. Refreshing every 5 minutes is usually possible. Faster is doable too in the right conditions.

So a process that collects up all the new data in the staging table and adds it to the target table every 1-5 minutes is way to do this at independent rates. CloudWatch can trigger a Lambda at whatever interval you like and the Lambda can issue the SQL to Redshift. The SQL will look like:

  1. rename staging table and create a new staging table in a single transaction
  2. in a new transaction and lock the renamed staging table exclusively so that all commits are complete before the process continues
  3. ingest the renamed staging table onto the target table as you are doing above in a transaction
  4. drop the renamed staging table

What is key is to use Redshift coherency to ensure that there is no data loss or duplication from the two processes, firehose and lambda, running at independent rates. Each COPY either starts before or after the COMMIT of the staging table rename transaction. If before, the data coming in goes to the renamed staging table and we need to be sure that the ingestion to the target table doesn't start before the COPY is complete and committed. If after, the data goes to the new staging table. I'd add a random number to the renamed staging table's name so that only the Lambda will use this name - this prevents name collisions if the rate of ingestion is too high for a large wave of incoming data and these processes overlap.

Upvotes: 3

Related Questions