Reputation: 1302
In my current project, my objective is to detect different objects from a Stream of Frames. The video frames are captured using camera, connected with the Raspberry PI.
The following is a rough architecture.
The architecture design is as follows:
video_cap.py
code is running on raspberry PI. This code sends a stream of images to the Kinesis Data Stream (called it FrameStream
) in AWS.
The FrameStream
Data Stream receives the stream and trigger to a lambda function (named lambda_function.py
) . The lambda function is written using Python 3.7
.
This lambda function receives streams of images, and trigger AWS Rekognition and sends an email notification.
My problem is even if I stop (by pressing Ctrl + C
) ( video_cap.py
python file, running on raspberry PI), the lambda function keep writing logs (reporting old received frames) into CloudWatch.
Please help me - how can I fix this issues? Please let me know if you need any additional information.
video_cap.py
file code
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
# http://aws.amazon.com/asl/
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
kinesis_client = boto3.client("kinesis")
rekog_client = boto3.client("rekognition")
camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0
#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
try:
#convert opencv Mat to jpg image
#print "----FRAME---"
retval, buff = cv2.imencode(".jpg", frame)
img_bytes = bytearray(buff)
utc_dt = pytz.utc.localize(datetime.datetime.now())
now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()
frame_package = {
'ApproximateCaptureTime' : now_ts_utc,
'FrameCount' : frame_count,
'ImageBytes' : img_bytes
}
if write_file:
print("Writing file img_{}.jpg".format(frame_count))
target = open("img_{}.jpg".format(frame_count), 'w')
target.write(img_bytes)
target.close()
#put encoded image in kinesis stream
if enable_kinesis:
print "Sending image to Kinesis"
response = kinesis_client.put_record(
StreamName="FrameStream",
Data=cPickle.dumps(frame_package),
PartitionKey="partitionkey"
)
print response
if enable_rekog:
response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
print response
except Exception as e:
print e
def main():
argv_len = len(sys.argv)
if argv_len > 1 and sys.argv[1].isdigit():
capture_rate = int(sys.argv[1])
cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
pool = Pool(processes=3)
frame_count = 0
while True:
# Capture frame-by-frame
ret, frame = cap.read()
#cv2.resize(frame, (640, 360));
if ret is False:
break
if frame_count % capture_rate == 0:
result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))
frame_count += 1
# Display the resulting frame
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()
return
if __name__ == '__main__':
main()
Lambda function (lambda_function.py
)
from __future__ import print_function
import base64
import json
import logging
import _pickle as cPickle
#import time
from datetime import datetime
import decimal
import uuid
import boto3
from copy import deepcopy
logger = logging.getLogger()
logger.setLevel(logging.INFO)
rekog_client = boto3.client('rekognition')
# S3 Configuration
s3_client = boto3.client('s3')
s3_bucket = "bucket-name-XXXXXXXXXXXXX"
s3_key_frames_root = "frames/"
# SNS Configuration
sns_client = boto3.client('sns')
label_watch_sns_topic_arn = "SNS-ARN-XXXXXXXXXXXXXXXX"
#Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
labels_on_watch_list = []
labels_on_watch_list_set = []
text_list_set = []
# List for detected text
text_list = []
def process_image(event, context):
# Start of for Loop
for record in event['Records']:
frame_package_b64 = record['kinesis']['data']
frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
img_bytes = frame_package["ImageBytes"]
approx_capture_ts = frame_package["ApproximateCaptureTime"]
frame_count = frame_package["FrameCount"]
now_ts = datetime.now()
frame_id = str(uuid.uuid4())
approx_capture_timestamp = decimal.Decimal(approx_capture_ts)
year = now_ts.strftime("%Y")
mon = now_ts.strftime("%m")
day = now_ts.strftime("%d")
hour = now_ts.strftime("%H")
#=== Object Detection from an Image =====
# AWS Rekognition - Label detection from an image
rekog_response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=10,
MinConfidence= 90.0
)
logger.info("Rekognition Response" + str(rekog_response) )
for label in rekog_response['Labels']:
lbl = label['Name']
conf = label['Confidence']
labels_on_watch_list.append(deepcopy(lbl))
labels_on_watch_list_set = set(labels_on_watch_list)
#print(labels_on_watch_list)
logger.info("Labels on watch list ==>" + str(labels_on_watch_list_set) )
# Vehicle Detection
#if (lbl.upper() in (label.upper() for label in ["Transportation", "Vehicle", "Van" , "Ambulance" , "Bus"]) and conf >= 50.00):
#labels_on_watch_list.append(deepcopy(label))
#=== Detecting text from a detected Object
# Detect text from the detected vehicle using detect_text()
response=rekog_client.detect_text( Image={ 'Bytes': img_bytes })
textDetections=response['TextDetections']
for text in textDetections:
text_list.append(text['DetectedText'])
text_list_set = set(text_list)
logger.info("Text Detected ==>" + str(text_list_set))
# End of for Loop
# SNS Notification
if len(labels_on_watch_list_set) > 0 :
logger.info("I am in SNS Now......")
notification_txt = 'On {} Vehicle was spotted with {}% confidence'.format(now_ts.strftime('%x, %-I:%M %p %Z'), round(label['Confidence'], 2))
resp = sns_client.publish(TopicArn=label_watch_sns_topic_arn,
Message=json.dumps(
{
"message": notification_txt + " Detected Object Categories " + str(labels_on_watch_list_set) + " " + " Detect text on the Object " + " " + str(text_list_set)
}
))
#Store frame image in S3
s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, frame_id)
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=img_bytes
)
print ("Successfully processed records.")
return {
'statusCode': 200,
'body': json.dumps('Successfully processed records.')
}
def lambda_handler(event, context):
logger.info("Received event from Kinesis ......" )
logger.info("Received event ===>" + str(event))
return process_image(event, context)
The following is IAM policy attached with the Lambda role.
The following is the Kinesis Data Stream Log (Dated 17th August, 2019 - 1:54 PM IST). The last time, the data ingested through Raspberry PI on 16th August, 2019 - 6:45 PM)
Upvotes: 1
Views: 853
Reputation: 1832
It looks like you have about 117K records in the stream but slowly processing 1 records at time. How long does it take the lambda to process one record? I would get how long your lambda runs , update the python put code to sleep a little longer the lambda runs (start with 20% longer), then restart with an empty queue, and watch the stats in real time.
Upvotes: 2