Eckaard
Eckaard

Reputation: 53

Insert into AWS Timestream using API

I'm currently working on a project where I am required to write sensor data to AWS Timestream. I could use AWS IoT Core (MQTT) and inject that into Timestream, however, would prefer making a direct insert into Timestream. I also can't use the SDK/boto3 as I'm using an ESP32 with micropython.

Unfortunately I can't seem to find any information regarding the AWS Timestream API. I know it is available as stated here: Timestream API Reference

Has anyone perhaps got an example. I have accomplished this using influxdb using the following setup:

post_header = {
    'Authorization': 'Token %s' % token,
    'Accept-Encoding': 'gzip'
}
get_header = {
    'Authorization': 'Token %s' % token,
    'Accept': 'application/csv',
    'Content-type': 'application/vnd.flux',
    'Accept-Encoding': 'gzip'
}

influx_endpoint = "https://us-west-1-1.aws.cloud2.influxdata.com/api/v2/write?org=%s&bucket=%s" % (user.user_name, bucket)
response = urequests.post(influx_endpoint,
                          data=data,
                          headers=user.post_header)

However would prefer using AWS. Any help would be appreciated.

Thanks in advance.

Upvotes: 2

Views: 1348

Answers (1)

Eckaard
Eckaard

Reputation: 53

I ended up writing a lambda function which could be triggered using API Gateway.

Here is the Lambda Function:

import boto3
from datetime import datetime

TABLE_NAME = "temp_table"
DATABASE_NAME = "temp_database"


def lambda_handler(event, context):
    temp = SendData()
    temp.populate_dimension(event["fleet_id"])
    timestamp=datetime(int(event["yy"]),int(event["mm"]),int(event["dd"]),int(event["h"]),int(event["m"]),int(float(event["s"]))).timestamp()
    temp.populate_records(timestamp, event["lat"], event["long"], event["speed"], event["direction"], event["seen_fleet_id"], event["acknowledge_time"])
    temp.send()
    

class SendData():
    def __init__(self):
        self.client = boto3.client('timestream-write')

    def populate_dimension(self, fleet_id):
        self.dimension = [
            {'Name': 'fleet_id', 'Value': '%s' % fleet_id},
        ]

    def populate_records(self, timestamp, lat, long, speed, direction, seen_fleet_id=None, acknowledge_time=None):
        current_time = str(int(timestamp * 1000))
        if seen_fleet_id != None:
            record = {
                'Dimensions': self.dimension,
                'MeasureName': 'fleet_data',

                'MeasureValues': [
                    {
                        "Name": "location_lat",
                        "Value": '%s' % lat,
                        "Type": "DOUBLE"
                    },
                    {
                        "Name": "location_long",
                        "Value": '%s' % long,
                        "Type": "DOUBLE"
                    },
                    {
                        "Name": "current_speed",
                        "Value": '%s' % speed,
                        "Type": "DOUBLE"
                    },
                    {
                        "Name": "current_direction",
                        "Value": '%s' % direction,
                        "Type": "VARCHAR"
                    },
                    {
                        "Name": "seen_fleet_id",
                        "Value": '%s' % seen_fleet_id,
                        "Type": "VARCHAR"
                    },
                    {
                        "Name": "acknowledge_time",
                        "Value": '%s' % acknowledge_time,
                        "Type": "DOUBLE"
                    }
                ],
                'MeasureValueType': 'MULTI',
                'Time': current_time
            }
        else:
            record = {
                'Dimensions': self.dimension,
                'MeasureName': 'fleet_data',

                'MeasureValues': [
                    {
                        "Name": "location_lat",
                        "Value": '%s' % lat,
                        "Type": "DOUBLE"
                    },
                    {
                        "Name": "location_long",
                        "Value": '%s' % long,
                        "Type": "DOUBLE"
                    },
                    {
                        "Name": "current_speed",
                        "Value": '%s' % speed,
                        "Type": "DOUBLE"
                    },
                    {
                        "Name": "current_direction",
                        "Value": '%s' % direction,
                        "Type": "VARCHAR"
                    }
                ],
                'MeasureValueType': 'MULTI',
                'Time': current_time
            }
        self.records = [record]

    def send(self):
        try:
            result = self.client.write_records(DatabaseName=DATABASE_NAME,
                                               TableName=TABLE_NAME,
                                               Records=self.records)
            if result['ResponseMetadata']['HTTPStatusCode'] == 200:
                return True
            else:
                return False
        except self.client.exceptions.RejectedRecordsException as err:
            print("RejectedRecords: ", err)
            for rr in err.response["RejectedRecords"]:
                print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
            print("Other records were written successfully. ")
            return False
        except Exception as err:
            print("Error:", err)
            return False

Upvotes: 2

Related Questions