Reputation: 55
I have set of records under some specific shards in the Kinesis stream. I m using KCL 2.x consumer to consume records from kinesis, but the issue is that consumer is fetching me records from all the shards available in the stream. So is there any way i can specify shards or their ID's while configuring the configBuilder object or KCL consumer, so that only records from specified shards are consumed.
Sample Code :
configsBuilder = new ConfigsBuilder(
applicationName,
streamName,
kinesisAsyncClient,
dynamoDbClient,
cloudWatchClient,
workerID,
new RecordProcessorFactory());
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configBuilder.retrievalConfig()
);
// start the kinesis records consumer.
schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
Thanks in advance!
Upvotes: 1
Views: 868
Reputation: 1641
KCL 2.x provides a ShardPrioritization
interface which allows to prioritize or filter the shards:
/**
* Provides logic to prioritize or filter shards before their execution.
*/
public interface ShardPrioritization {
/**
* Returns new list of shards ordered based on their priority.
* Resulted list may have fewer shards compared to original list
*
* @param original
* list of shards needed to be prioritized
* @return new list that contains only shards that should be processed
*/
List<ShardInfo> prioritize(List<ShardInfo> original);
}
That's said, you can provide the ShardPrioritization
implementation which will leave only shards relevant for you.
After that, just specify your prioritizer in coordinator config:
configsBuilder.coordinatorConfig
.shardPrioritization(new CustomShardsPrioritixation())
Upvotes: 1