Gili Nachum
Gili Nachum

Reputation: 5578

How to fanout an AWS kinesis stream?

I'd like to fanout/chain/replicate an Input AWS Kinesis stream To N new Kinesis streams, So that each record written to the input Kinesis will appear in each of the N streams.

Is there an AWS service or an open source solution?

I prefer not to write code to do that if there's a ready-made solution. AWS Kinesis firehose is a no solution because it can't output to kinesis. Perhaps a AWS Lambda solution if that won't be too expensive to run?

Upvotes: 20

Views: 12714

Answers (3)

Tri Q Tran
Tri Q Tran

Reputation: 5690

There are two AWS native solutions to fanning out Kinesis streams that don't require AWS Firehose or AWS Lambda.

  1. Similar to Kafka consumer groups, Kinesis has the application name. Every consumer to the stream can provide a unique application name. If two consumer has the same application name, then messages are distributed between them. To fan out the stream, provide a different application name for those consumers that you want to receive the same messages from the stream. Kinesis will, under the hood, create new DynamoDB tables to keep track of each consumer for each new application so that they can consume messages at a different rate, etc.
  2. Use Kinesis Enhanced Fan-Out for higher throughput (up to 2MiB per second) and this does not count towards your global read limit. At the time of writing, there is a limit of 20 "enhanced fan-out" consumers per stream.

One caveat as far I am aware with these two options is that you need to use the Kinesis Client Library (KCL) (and not the raw AWS SDK).

Upvotes: 1

Hammer
Hammer

Reputation: 8888

There is a github repo from Amazon lab providing the fanout using lambda. https://github.com/awslabs/aws-lambda-fanout . Also read "Transforming a synchronous Lambda invocation into an asynchronous one" on https://medium.com/retailmenot-engineering/building-a-high-throughput-data-pipeline-with-kinesis-lambda-and-dynamodb-7d78e992a02d , which is critical to build a truly asynchronous processing.

Upvotes: 2

John Rotenstein
John Rotenstein

Reputation: 270134

There are two ways you could accomplish fan-out of an Amazon Kinesis stream:

  • Use Amazon Kinesis Analytics to copy records to additional streams
  • Trigger an AWS Lambda function to copy records to another stream

Option 1: Using Amazon Kinesis Analytics to fan-out

You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.

From the Amazon Kinesis Analytics documentation:

Amazon Kinesis Analytics applications continuously read and process streaming data in real-time. You write application code using SQL to process the incoming streaming data and produce output. Then, Amazon Kinesis Analytics writes the output to a configured destination.

Amazon Kinesis Analytics flow diagram

Fan-out is mentioned in the Application Code section:

You can also write SQL queries that run independent of each other. For example, you can write two SQL statements that query the same in-application stream, but send output into different in-applications streams.

I managed to implement this as follows:

  • Created three streams: input, output1, output2
  • Created two Amazon Kinesis Analytics applications: copy1, copy2

The Amazon Kinesis Analytics SQL application looks like this:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

This code creates a pump (think of it as a continual select statement) that selects from the input stream and outputs to the output1 stream. I created another identical application that outputs to the output2 stream.

To test, I sent data to the input stream:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

I let it run for a while, then displayed the output using this code:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

The output was:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

I ran it again for output2 and the identical output was shown.

Option 2: Using AWS Lambda

If you are fanning-out to many streams, a more efficient method might be to create an AWS Lambda function:

  • Triggered by Amazon Kinesis stream records
  • That writes records to multiple Amazon Kinesis 'output' streams

You could even have the Lambda function self-discover the output streams based on a naming convention (eg any stream named app-output-*).

Upvotes: 19

Related Questions