neowulf33
neowulf33

Reputation: 645

How to I index the transformed log records into AWS Elasticsearch?

TLDR

The lambda function is not able to index the firehose logs into the AWS managed ES due to an "encoding problem".

Actual Error Response

I do not get any error when I base64 encode a single logEvent from a firehose record and send the collected records to the AWS managed ES.

See the next section for more details.

The base 64 encoded compressed payload is being sent to ES as the resulting json transformation is too big for ES to index - see this ES link.

I get the following error from the AWS managed ES:

{
    "deliveryStreamARN": "arn:aws:firehose:us-west-2:*:deliverystream/*",
    "destination": "arn:aws:es:us-west-2:*:domain/*",
    "deliveryStreamVersionId": 1,
    "message": "The data could not be decoded as UTF-8",
    "errorCode": "InvalidEncodingException",
    "processor": "arn:aws:lambda:us-west-2:*:function:*"
  }

If the output record is not compressed, the body size is too long (as small as 14MB). Without compression and a simple base64 encoded payload, I get the following error in the Lambda logs:

{
  "type": "mapper_parsing_exception",
  "reason": "failed to parse",
  "caused_by": {
    "type": "not_x_content_exception",
    "reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
  }
}

Description

I have Cloudwatch logs that are getting buffered by size / interval which gets fed into a Kinesis Firehose. The firehose transports the logs into a lambda function which transforms the log into a json record which should then send it over to the AWS managed Elasticsearch cluster.

The lambda function gets the following JSON structure:

{
    "invocationId": "cf1306b5-2d3c-4886-b7be-b5bcf0a66ef3",
    "deliveryStreamArn": "arn:aws:firehose:...",
    "region": "us-west-2",
    "records": [{
        "recordId": "49577998431243709525183749876652374166077260049460232194000000",
        "approximateArrivalTimestamp": 1508197563377,
        "data": "some_compressed_data_in_base_64_encoding"
    }]
}

The lambda function then extracts .records[].data and decodes the data as base64 and decompresses the data which results in the following JSON:

{
  "messageType": "DATA_MESSAGE",
  "owner": "aws_account_number",
  "logGroup": "some_cloudwatch_log_group_name",
  "logStream": "i-0221b6ec01af47bfb",
  "subscriptionFilters": [
    "cloudwatch_log_subscription_filter_name"
  ],
  "logEvents": [
    {
      "id": "33633929427703365813575134502195362621356131219229245440",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_1"
    },
    {
      "id": "33633929427703365813575134502195362621356131219229245441",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_2"
    },
    {
      "id": "33633929427703365813575134502195362621356131219229245442",
      "timestamp": 1508197557000,
      "message": "Oct 16 23:45:57 some_log_entry_3"
    }
  ]
}

Individual item from .logEvents[] gets transformed into a json structure where the keys are the desired columns when searching logs within Kibana - something like this:

{
    'journalctl_host': 'ip-172-11-11-111',
    'process': 'haproxy',
    'pid': 15507,
    'client_ip': '172.11.11.111',
    'client_port': 3924,
    'frontend_name': 'http-web',
    'backend_name': 'server',
    'server_name': 'server-3',
    'time_duration': 10,
    'status_code': 200,
    'bytes_read': 79,
    '@timestamp': '1900-10-16T23:46:01.0Z',
    'tags': ['haproxy'],
    'message': 'HEAD / HTTP/1.1'
}

The transformed json gets collected into an array which gets zlib compressed and base64 encoded string which is then transformed into a new json payload as the final lambda result:

{
"records": [
    {
        "recordId": "49577998431243709525183749876652374166077260049460232194000000",
        "result": "Ok",
        "data": "base64_encoded_zlib_compressed_array_of_transformed_logs"
    }
]}

Cloudwatch configuration

13 log entries (~4kb) can get transformed to about 635kb.

I have also decreased the thresholds for the awslogs, hoping that the size of the logs that are being sent to Lambda function is going to small:

buffer_duration = 10
batch_count = 10
batch_size = 500

Unfortunately, when there is a burst - the spike can be upwards of 2800 lines where the size is upwards of 1MB.

When the resulting payload from the lambda function is "too big" (~13mb of transformed logs), an error is logged in the lambda cloudwatch logs - "body size is too long". There doesn't seem to be any indication where this error is coming from or whether there is a size limit on the lambda fn's response payload.

Upvotes: 3

Views: 1572

Answers (1)

neowulf33
neowulf33

Reputation: 645

So, the AWS support folks have told me that the following limitations can't be mitigated to solve this flow:

  1. lambda payload size
  2. compressed firehose payload incoming into lambda which is directly proportional to the lambda output.

Instead, I have modified the architecture to the following:

  1. Cloudwatch logs are backed up in S3 via Firehose.
  2. S3 events are processed by the lambda function.
  3. The lambda function returns a success code if the lambda transforms and is able to successfully bulk index the logs into ES.
  4. If the lambda function fails, a Dead Letter Queue (AWS SQS) is configured with a cloudwatch alarm. A sample cloudformation snippet can be found here.
  5. If there are SQS messages, one could manually invoke the lambda function with those messages or set up a AWS batch job to process the SQS messages with the lambda function. However, one should be careful, that the lambda function doesn't failover again into the DLQ. Check the lambda cloudwatch logs to check why that message was not processed and sent over to the DLQ.

Upvotes: 3

Related Questions