Reputation: 39
I'm setting up a Kinesis Data Stream -> Firehose -> Redshift
datapipeline.
Docs say that Firehose might introduce duplicates.
To solve this case it I'm trying to use an "upsert" approach from here.
Is it possible to set up a transaction invoke through the Firehose COPY command ?
What are the best practices ?
Thanks in advance !
Helpful code:
create table target
(
key BIGINT,
value VARCHAR
);
{"key": 1, "value": "stub 1"}
begin transaction;
-- create a stage table like a target table
create temp table stage (like target);
-- populate the stage table
COPY stage FROM 's3://<bucket-name>/<manifests-folder>/'
CREDENTIALS 'aws_iam_role=arn:aws:iam::<iam>:role/<role>'
MANIFEST format as json 'auto';
-- delete duplicates from the target table
delete from target using stage
where target.key = stage.key;
-- populate the target table
insert into target
(select * from stage);
-- drop the stage table
drop table stage;
end transaction;
Upvotes: 0
Views: 1403
Reputation: 11082
Yes you can set up a trigger on Firehose but I recommend that you don't. If you are using Firehose you have data coming in quickly and Firehose will COPY to Redshift as it needs. The issue that comes up is how fast do you really want tables changing in Redshift. Redshift is big but there are some practical limits to how fast you want to be adding incremental data to your fact tables. This rate will depend on what transforms are needed after ingestion, how often you can afford to vacuum / analyze, how heavy is your query load / complexity, etc. Refreshing every 5 minutes is usually possible. Faster is doable too in the right conditions.
So a process that collects up all the new data in the staging table and adds it to the target table every 1-5 minutes is way to do this at independent rates. CloudWatch can trigger a Lambda at whatever interval you like and the Lambda can issue the SQL to Redshift. The SQL will look like:
What is key is to use Redshift coherency to ensure that there is no data loss or duplication from the two processes, firehose and lambda, running at independent rates. Each COPY either starts before or after the COMMIT of the staging table rename transaction. If before, the data coming in goes to the renamed staging table and we need to be sure that the ingestion to the target table doesn't start before the COPY is complete and committed. If after, the data goes to the new staging table. I'd add a random number to the renamed staging table's name so that only the Lambda will use this name - this prevents name collisions if the rate of ingestion is too high for a large wave of incoming data and these processes overlap.
Upvotes: 3