Saurabh Kukreti
Saurabh Kukreti

Reputation: 119

How to upload the data from python sdk to kinesis using boto3

How to upload the data from csv to aws kinesis using boto3

I have tried three methods and it is all working for me.

  1. To upload the data from csv to kinesis in chunks.
  2. Upload the random generated data from local to kinesis.
  3. Upload the csv data row by row from local to kinesis using boto3

Moreover how to consume data from kinesis to python sdk

Upvotes: 6

Views: 6498

Answers (4)

Saurabh Kukreti
Saurabh Kukreti

Reputation: 119

# Consumer SDK using python3
import boto3
import json
from datetime import datetime
import time

my_stream_name = 'Flight-Simulator'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

#Get the description of kinesis shard, it is json from which we will get the the 
shard ID
response = kinesis_client.describe_stream(StreamName=my_stream_name)
my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']


shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
                                                  ShardId=my_shard_id,
                                                  ShardIteratorType='LATEST')

my_shard_iterator = shard_iterator['ShardIterator']

record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
                                          Limit=2)

while 'NextShardIterator' in record_response:
    record_response = 
kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
                                              Limit=2)
    if len(record_response['Records'])>0:
        print(json.loads(record_response['Records'][0]['Data']))

    time.sleep(5)

Upvotes: 1

Saurabh Kukreti
Saurabh Kukreti

Reputation: 119

Method3 - Row by row from csv to Kinesis
#Sending the data from CSV to Kinesis data stream row by row
my_stream_name = 'Flight-Simulator'
thing_id ='XYZ'
kinesis_client = boto3.client('kinesis', region_name='us-east-1')

with open("flights_Test.csv") as f:
#Creating the ordered Dict
    reader = csv.DictReader(f)
    for row in reader:
        put_response = kinesis_client.put_record(
                StreamName=my_stream_name,
                Data=json.dumps(row),
                PartitionKey=thing_id)

Upvotes: 1

Saurabh Kukreti
Saurabh Kukreti

Reputation: 119

Method2- Random generated JSON to Kinesis
#Generating the random number of record and sendint to Kinesis data stream

import boto3
import json
from datetime import datetime
import calendar
import random
import time

my_stream_name = 'Flight-Simulator'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

def put_to_stream(thing_id, property_value, property_timestamp):
payload = {
            'prop': str(property_value),
            'timestamp': str(property_timestamp),
            'thing_id': thing_id
          }

print(payload)

put_response = kinesis_client.put_record(
                    StreamName=my_stream_name,
                    Data=json.dumps(payload),
                    PartitionKey=thing_id)

while True:
    property_value = random.randint(40, 120)
    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
    thing_id = 'aa-bb'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second
    time.sleep(5)

Upvotes: 1

Saurabh Kukreti
Saurabh Kukreti

Reputation: 119

Method1 Chunk by chunk

import csv
import json
import boto3
from random import randint
def chunkit(l, n):
"""Yield successive n-sized chunks from l."""
 for i in range(0, len(l), n):
    yield l[i:i + n]

kinesis = boto3.client("kinesis")
with open("flights.csv") as f:
#Creating the ordered Dict
 reader = csv.DictReader(f)
#putting the json as per the number of chunk we will give in below function 
#Create the list of json and push like a chunk. I am sending 100 rows together
 records = chunkit([{"PartitionKey": 'sau', "Data": json.dumps(row)} for row in reader], 100)
for chunk in records:
    kinesis.put_records(StreamName="Flight-Simulator", Records=chunk)

Upvotes: 3

Related Questions