Reputation: 1991
I am having trouble checking the data I am writing to Kinesis. It seems like the following example should work, but I am getting an empty list returned from get_records (in the Records field). Any ideas what could be going on?
import uuid
import boto3
import time
streamname = 'mytestStream'
session = boto3.session.Session()
kinesis_client = session.client('kinesis', region_name='us-east-1')
##### WRITE TO KINESIS
partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)
time.sleep(5)
##### READ FROM KINESIS
shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)
Thanks!
Upvotes: 4
Views: 1605
Reputation: 3649
If you'll use LATEST checkpoint, you should first start reading the stream, then place the record. In your example the timeline is as follows;
To fix this, you should run the producer and the consumer in different threads. The correct flow should be like this;
Upvotes: 4