emrahozkan
emrahozkan

Reputation: 193

kinesis getting data from multiple shards

I am trying to build a simple application that reads data from AWS Kinesis. I have managed to read data using a single shard but I want to get data from 4 different shards.

Problem is, I have a while loop which iterates as long as the shard is active which prevents me from reading data from different shards. So far I couldn't find an alternative algorithm nor was able to implement a KCL-based solution. Many thanks in advance

public static void DoSomething() {
        AmazonKinesisClient client = new AmazonKinesisClient();
        //noinspection deprecation
        client.setEndpoint(endpoint, serviceName, regionId);  
        /** get shards from the stream using describe stream method*/

        DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
        describeStreamRequest.setStreamName(streamName);
        List<Shard> shards = new ArrayList<>();
        String exclusiveStartShardId = null;
        do {
            describeStreamRequest.setExclusiveStartShardId(exclusiveStartShardId);
            DescribeStreamResult describeStreamResult = client.describeStream(describeStreamRequest);
            shards.addAll(describeStreamResult.getStreamDescription().getShards());
            if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) {
                exclusiveStartShardId = shards.get(shards.size() - 1).getShardId();
            } else {
                exclusiveStartShardId = null;
            }
        }while (exclusiveStartShardId != null);

        /** shards obtained */
        String shardIterator;

        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(streamName);
        getShardIteratorRequest.setShardId(shards.get(0).getShardId());
        getShardIteratorRequest.setShardIteratorType("LATEST"); 

        GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
        shardIterator = getShardIteratorResult.getShardIterator();
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();

        while (!shardIterator.equals(null)) {
            getRecordsRequest.setShardIterator(shardIterator);
            getRecordsRequest.setLimit(250);
            GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest);
            List<Record> records = getRecordsResult.getRecords();

            shardIterator = getRecordsResult.getNextShardIterator();
            if(records.size()!=0) {
                for(Record r : records) {
                    System.out.println(r.getPartitionKey());
                }
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }

Upvotes: 4

Views: 2659

Answers (2)

Dhwani Katagade
Dhwani Katagade

Reputation: 1292

For a simpler and neater solution where you only have to worry about providing your own message processing code, I would recommend using the KCL Library.

Quoting from the documentation

The KCL acts as an intermediary between your record processing logic and Kinesis Data Streams. The KCL performs the following tasks:

  • Connects to the data stream
  • Enumerates the shards within the data stream
  • Uses leases to coordinates shard associations with its workers
  • Instantiates a record processor for every shard it manages
  • Pulls data records from the data stream
  • Pushes the records to the corresponding record processor
  • Checkpoints processed records
  • Balances shard-worker associations (leases) when the worker instance count changes or when the data stream is resharded (shards are split or merged)

Upvotes: 0

Guy
Guy

Reputation: 12901

It is recommended that you will not read from a single process/worker from multiple shards. First, as you can see it is adding to the complexity of your code, but more importantly, you will have problems scaling up.

The "secret" of scalability is to have small and independent workers or other such units. Such design you can see in Hadoop, DynamoDB or Kinesis in AWS. It allows you to build small systems (micro-services), that can easily scale up and down as needed. You can easily add more units of work/data as your service becomes more successful, or other fluctuations in its usage.

As you can see in these AWS services, you sometimes can get this scalability automatically such in DynamoDB, and sometimes you need add shards to your kinesis streams. But for your application you need to control somehow your scalability.

In the case of Kinesis, you can scale up and down using AWS Lambda or Kinesis Client Library (KCL). Both of them are listening to the status of your streams (number of shards and events) and using it to add or remove workers and deliver the events for them to process. In both of these solutions you should build a worker that is working against a single shard.

If you need to align events from multiple shards, you can do that using some state service such as Redis or DynamoDB.

Upvotes: 1

Related Questions