ima747
ima747

Reputation: 4667

Backfill AWS Kinesis Firehose to Elasticsearch Service failed records

We have a firehose that sends records to an Elasticsearch Service cluster. Our cluster filled up and some records failed over to S3. The documentation at https://docs.aws.amazon.com/firehose/latest/dev/basic-deliver.html#retry indicates that failed records can be used to backfill: "The skipped documents are delivered to your S3 bucket in the elasticsearch_failed/ folder, which you can use for manual backfill" but I haven't been able to find any documentation on how to accomplish this.

Looking at the records they appear to be gzip files of text files containing JSON blobs with a "rawData" field containing a base64 encoded string of the original record we sent to firehose.

Is there a existing tool to process these gzip files out of S3, break them down, and re-submit the records? The documentation implies that you can "just manually backfill" and it's a pretty standardized flow so my assumption is someone has done this before but I haven't been able to find how.

Upvotes: 7

Views: 1633

Answers (2)

sachin roy
sachin roy

Reputation: 52

Had the same problem, modified the above script to backfill failed documents (with 403 )to an existing elasticsearch instance

import boto3
import json
import base64
import logger
import requests

s3_client = boto3.client('s3', region_name="xx-xx-x", aws_access_key_id="xxxx", aws_secret_access_key="xxxx")
s3keys = s3_client.list_objects(Bucket="bucketname", Prefix='path/to/folder/file')
for s3key in s3keys['Contents']:
    print(s3key['Key'])
    file = s3_client.get_object(Bucket="bucketname", Key=s3key['Key'])
    text = file['Body'].read().decode("utf-8")
    failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))
    for case in failure_cases:
        data = base64.b64decode(case['rawData'])
        esid = case['esDocumentId']
        esIndexName = case['esIndexName']
        doc = data.decode('utf-8')
        url = ("https://es-domain-name/%s/_doc/%s" %(esIndexName, esid ))
        headers = {"content-type": "application/json", "Accept-Charset": "UTF-8"}
        if case['errorCode'] == '403':
            try:
                print(case['errorCode'])
                r = requests.post(url, data=doc, headers=headers, auth=('user', 'password'))
                response = r.json()
                print(response)
            except:
                pass

Upvotes: 1

Nimrod Kor
Nimrod Kor

Reputation: 9

I suppose manual backfill means to use one of the AWS SDKs to send the documents into Elasticsearch again. An example in python (using boto3), of reading a failure file from S3 and sending the documents within to Elasticsearch:

es_client = boto3.client('es', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)
s3_client = boto3.client('s3', region_name=REGION, aws_access_key_id=ACCESS_KEY_ID, aws_secret_access_key=SECRET_ACCESS_KEY)

file = s3_client.get_object(Bucket=bucket, Key=key)
text = file['Body'].read().decode("utf-8")
failure_cases = list(map(lambda x: json.loads(x), filter(None, text.split('\n'))))

for case in failure_cases:
    try:
        data = base64.b64decode(case['rawData'])
        es_instance.create(index=case['esIndexName'], id=case['esDocumentId'], body=data)
        logger.debug("Successfully sent {}".format(case['esDocumentId']))
    except RequestError:
        logger.info("Retry failed for Document ID {}\nReason: {}"
                    .format(case['esDocumentId'], case['errorMessage']))

Upvotes: 1

Related Questions