Reputation: 53
I'm currently working on a project where I am required to write sensor data to AWS Timestream. I could use AWS IoT Core (MQTT) and inject that into Timestream, however, would prefer making a direct insert into Timestream. I also can't use the SDK/boto3 as I'm using an ESP32 with micropython.
Unfortunately I can't seem to find any information regarding the AWS Timestream API. I know it is available as stated here: Timestream API Reference
Has anyone perhaps got an example. I have accomplished this using influxdb using the following setup:
post_header = {
'Authorization': 'Token %s' % token,
'Accept-Encoding': 'gzip'
}
get_header = {
'Authorization': 'Token %s' % token,
'Accept': 'application/csv',
'Content-type': 'application/vnd.flux',
'Accept-Encoding': 'gzip'
}
influx_endpoint = "https://us-west-1-1.aws.cloud2.influxdata.com/api/v2/write?org=%s&bucket=%s" % (user.user_name, bucket)
response = urequests.post(influx_endpoint,
data=data,
headers=user.post_header)
However would prefer using AWS. Any help would be appreciated.
Thanks in advance.
Upvotes: 2
Views: 1348
Reputation: 53
I ended up writing a lambda function which could be triggered using API Gateway.
Here is the Lambda Function:
import boto3
from datetime import datetime
TABLE_NAME = "temp_table"
DATABASE_NAME = "temp_database"
def lambda_handler(event, context):
temp = SendData()
temp.populate_dimension(event["fleet_id"])
timestamp=datetime(int(event["yy"]),int(event["mm"]),int(event["dd"]),int(event["h"]),int(event["m"]),int(float(event["s"]))).timestamp()
temp.populate_records(timestamp, event["lat"], event["long"], event["speed"], event["direction"], event["seen_fleet_id"], event["acknowledge_time"])
temp.send()
class SendData():
def __init__(self):
self.client = boto3.client('timestream-write')
def populate_dimension(self, fleet_id):
self.dimension = [
{'Name': 'fleet_id', 'Value': '%s' % fleet_id},
]
def populate_records(self, timestamp, lat, long, speed, direction, seen_fleet_id=None, acknowledge_time=None):
current_time = str(int(timestamp * 1000))
if seen_fleet_id != None:
record = {
'Dimensions': self.dimension,
'MeasureName': 'fleet_data',
'MeasureValues': [
{
"Name": "location_lat",
"Value": '%s' % lat,
"Type": "DOUBLE"
},
{
"Name": "location_long",
"Value": '%s' % long,
"Type": "DOUBLE"
},
{
"Name": "current_speed",
"Value": '%s' % speed,
"Type": "DOUBLE"
},
{
"Name": "current_direction",
"Value": '%s' % direction,
"Type": "VARCHAR"
},
{
"Name": "seen_fleet_id",
"Value": '%s' % seen_fleet_id,
"Type": "VARCHAR"
},
{
"Name": "acknowledge_time",
"Value": '%s' % acknowledge_time,
"Type": "DOUBLE"
}
],
'MeasureValueType': 'MULTI',
'Time': current_time
}
else:
record = {
'Dimensions': self.dimension,
'MeasureName': 'fleet_data',
'MeasureValues': [
{
"Name": "location_lat",
"Value": '%s' % lat,
"Type": "DOUBLE"
},
{
"Name": "location_long",
"Value": '%s' % long,
"Type": "DOUBLE"
},
{
"Name": "current_speed",
"Value": '%s' % speed,
"Type": "DOUBLE"
},
{
"Name": "current_direction",
"Value": '%s' % direction,
"Type": "VARCHAR"
}
],
'MeasureValueType': 'MULTI',
'Time': current_time
}
self.records = [record]
def send(self):
try:
result = self.client.write_records(DatabaseName=DATABASE_NAME,
TableName=TABLE_NAME,
Records=self.records)
if result['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
else:
return False
except self.client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
print("Other records were written successfully. ")
return False
except Exception as err:
print("Error:", err)
return False
Upvotes: 2