Schaumkuesschen
Schaumkuesschen

Reputation: 167

Kinesis Firehose to ES using a lambda transformation

I want to get Logs from a subscription filter and then put the logs in a s3 bucket and sent them to ES.

Similar like in the diagram here:

https://aws.amazon.com/solutions/implementations/centralized-logging/

When I am using this function:

/*
 For processing data sent to Firehose by Cloudwatch Logs subscription filters.
 Cloudwatch Logs sends to Firehose records that look like this:
 {
   "messageType": "DATA_MESSAGE",
   "owner": "123456789012",
   "logGroup": "log_group_name",
   "logStream": "log_stream_name",
   "subscriptionFilters": [
     "subscription_filter_name"
   ],
   "logEvents": [
     {
       "id": "01234567890123456789012345678901234567890123456789012345",
       "timestamp": 1510109208016,
       "message": "log message 1"
     },
     {
       "id": "01234567890123456789012345678901234567890123456789012345",
       "timestamp": 1510109208017,
       "message": "log message 2"
     }
     ...
   ]
 }
 The data is additionally compressed with GZIP.
 The code below will:
 1) Gunzip the data
 2) Parse the json
 3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the
    processing error output. Such records do not contain any log events. You can modify the code to set the result to
    Dropped instead to get rid of these records completely.
 4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass
    each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom
    transformations on the log events.
 5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that
    this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent
    method.
 6) Any additional records which exceed 6MB will be re-ingested back into Firehose.
 */
const zlib = require('zlib');
const AWS = require('aws-sdk');

/**
 * logEvent has this format:
 *
 * {
 *   "id": "01234567890123456789012345678901234567890123456789012345",
 *   "timestamp": 1510109208016,
 *   "message": "log message 1"
 * }
 *
 * The default implementation below just extracts the message and appends a newline to it.
 *
 * The result must be returned in a Promise.
 */
function transformLogEvent(logEvent: any) {
  return Promise.resolve(`${logEvent.message}\n`);
}

function putRecordsToFirehoseStream(streamName: any, records: any, client: any, resolve: any, reject: any, attemptsMade: any, maxAttempts: any) {
  client.putRecordBatch({
    DeliveryStreamName: streamName,
    Records: records,
  }, (err: any, data: any) => {
    const codes = [];
    let failed = [];
    let errMsg = err;

    if (err) {
      failed = records;
    } else {
      for (let i = 0; i < data.RequestResponses.length; i++) {
        const code = data.RequestResponses[i].ErrorCode;
        if (code) {
          codes.push(code);
          failed.push(records[i]);
        }
      }
      errMsg = `Individual error codes: ${codes}`;
    }

    if (failed.length > 0) {
      if (attemptsMade + 1 < maxAttempts) {
        console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg);
        putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
      } else {
        reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
      }
    } else {
      resolve('');
    }
  });
}

function putRecordsToKinesisStream(streamName: any, records: any, client: any, resolve: any, reject: any, attemptsMade: any, maxAttempts: any) {
  client.putRecords({
    StreamName: streamName,
    Records: records,
  }, (err: any, data: any) => {
    const codes = [];
    let failed = [];
    let errMsg = err;

    if (err) {
      failed = records;
    } else {
      for (let i = 0; i < data.Records.length; i++) {
        const code = data.Records[i].ErrorCode;
        if (code) {
          codes.push(code);
          failed.push(records[i]);
        }
      }
      errMsg = `Individual error codes: ${codes}`;
    }

    if (failed.length > 0) {
      if (attemptsMade + 1 < maxAttempts) {
        console.log('Some records failed while calling PutRecords, retrying. %s', errMsg);
        putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts);
      } else {
        reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`);
      }
    } else {
      resolve('');
    }
  });
}

function createReingestionRecord(isSas: any, originalRecord: any) {
  if (isSas) {
    return {
      Data: Buffer.from(originalRecord.data, 'base64'),
      PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey,
    };
  } else {
    return {
      Data: Buffer.from(originalRecord.data, 'base64'),
    };
  }
}


function getReingestionRecord(isSas: any, reIngestionRecord: any) {
  if (isSas) {
    return {
      Data: reIngestionRecord.Data,
      PartitionKey: reIngestionRecord.PartitionKey,
    };
  } else {
    return {
      Data: reIngestionRecord.Data,
    };
  }
}

exports.handler = (event: any, context: any, callback: any) => {
  Promise.all(event.records.map(function (r: any) {
    const buffer = Buffer.from(r.data, 'base64');

    let decompressed;
    try {
      decompressed = zlib.unzipSync(buffer);
    } catch (e) {
      return Promise.resolve({
        recordId: r.recordId,
        result: 'ProcessingFailed',
      });
    }

    const data = JSON.parse(decompressed);
    // CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable.
    // They do not contain actual data.
    if (data.messageType === 'CONTROL_MESSAGE') {
      return Promise.resolve({
        recordId: r.recordId,
        result: 'Dropped',
      });
    } else if (data.messageType === 'DATA_MESSAGE') {
      const promises = data.logEvents.map(transformLogEvent);
      return Promise.all(promises)
        .then(transformed => {
          const payload: any = transformed.reduce(function (a: any, v: any) {
            return a + v;
          });
          const encoded = Buffer.from(payload).toString();
          return {
            recordId: r.recordId,
            result: 'Ok',
            data: encoded,
          };
        });
    } else {
      return Promise.resolve({
        recordId: r.recordId,
        result: 'ProcessingFailed',
      });
    }
  })).then(recs => {
    const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn');
    const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn;
    const region = streamARN.split(':')[3];
    const streamName = streamARN.split('/')[1];
    const result: any = { records: recs };
    let recordsToReingest = [];
    const putRecordBatches: any = [];
    let totalRecordsToBeReingested = 0;
    const inputDataByRecId: any = {};
    event.records.forEach(function (r: any) { inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r) });

    let projectedSize = recs.filter(function (rec: any) { return rec.result === 'Ok' })
      .map(function (r: any) { return r.recordId.length + r.data.length })
      .reduce((a, b) => a + b, 0);
    // 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for
    for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) {
      const rec: any = result.records[idx];
      if (rec.result === 'Ok') {
        totalRecordsToBeReingested++;
        recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId]));
        projectedSize -= rec.data.length;
        delete rec.data;
        result.records[idx].result = 'Dropped';

        // split out the record batches into multiple groups, 500 records at max per group
        if (recordsToReingest.length === 500) {
          putRecordBatches.push(recordsToReingest);
          recordsToReingest = [];
        }
      }
    }

    if (recordsToReingest.length > 0) {
      // add the last batch
      putRecordBatches.push(recordsToReingest);
    }

    if (putRecordBatches.length > 0) {
      new Promise((resolve, reject) => {
        let recordsReingestedSoFar = 0;
        for (let idx = 0; idx < putRecordBatches.length; idx++) {
          const recordBatch = putRecordBatches[idx];
          if (isSas) {
            const client = new AWS.Kinesis({ region: region });
            putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20);
          } else {
            const client = new AWS.Firehose({ region: region });
            putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20);
          }
          recordsReingestedSoFar += recordBatch.length;
          console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName);
        }}).then(
          () => {
            console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName);
            callback(null, result);
          },
          failed => {
            console.log('Failed to reingest records. %s', failed);
            callback(failed, null);
          });
    } else {
      console.log('No records needed to be reingested.');
      callback(null, result);
    }
  }).catch(ex => {
    console.log('Error: ', ex);
    callback(ex, null);
  });
}; 

But I am getting a Lambda.FunctionError:

Check your function and make sure the output is in required format. In addition to that, make sure the processed records contain valid result status of Dropped, Ok, or ProcessingFailed

Does anybody know, which function is suitable, to receive logs from the Cloudwatch subscription filter, sending them to S3 and ES?

My code for the FirehoseDeliveryStream looks like:

const firehoseDeliveryStream = new CfnDeliveryStream(this, "FirehoseDeliveryStream", {
      deliveryStreamType: "DirectPut",
      elasticsearchDestinationConfiguration: {
        domainArn: elasticsearchDomain.domainArn,
        roleArn: firehoseDeliveryRole.roleArn,
        indexName: "test",

        s3Configuration: {
          bucketArn: this.logsBucket.bucketArn,
          roleArn: firehoseDeliveryRole.roleArn,
          cloudWatchLoggingOptions: {
            enabled: true,
            logGroupName: firehoseloggroup.logGroupName,
            logStreamName: logstream.logStreamName
          },
        },
        s3BackupMode: "AllDocuments",
        cloudWatchLoggingOptions: {
          enabled: true,
          logGroupName: firehoseloggroup.logGroupName,
          logStreamName: logstream.logStreamName
        },
        processingConfiguration: {
          enabled: true,
          processors: [{
            type: "Lambda",
            parameters: [{
              parameterName: "LambdaArn",
              parameterValue: handler.functionArn,
            }],
          }],
        },
      },
    });

Upvotes: 1

Views: 2818

Answers (1)

Mihai
Mihai

Reputation: 346

I have a CloudWatch log-group-1, kinesis firehose, lambda, S3.

log-group-1 sends logs to kinesis firehose (using subscription filter). Kinesis firehose triggers lambda to process the logs. Lambda returns the logs back to kinesis firehose and kinesis firehose saves transformed logs to S3.

Lambda gets the following input:

{
  "invocationId": "000ac99...",
  "deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
  "region": "eu-central-1",
  "records": [
    {
      "recordId": "496199814216613477...",
      "approximateArrivalTimestamp": 1625854080200,
      "data": "H4sIAAAAAAAAADWOwQrCM......"
    },
    {
      "recordId": "4961998142166134...",
      "approximateArrivalTimestamp": 1625854100311,
      "data": "H4sIAAAAAAAAADVPy07DMB......"
    }
  ]
}

To return the transformed message you must change the records list. See example:

"records": [
  {
    "recordId": "you better take it from the input",
    "result": "can be Ok, Dropped, ProcessingFailed",
    "data": "must be an encoded base-64 string"
  }
]

I attached a code written in Javascipt. It is enough just to copy-paste it to lambda.

const node_gzip_1 = require("node-gzip");

async function handler(event) {
  console.log('event: ' + JSON.stringify(event, undefined, 3));
  let result = [];

  // Iterate through records list
  const records = event.records;
  for (let ii = 0; ii < records.length; ii++) {
    const record = records[ii];
    const recordId = record.recordId;

    // Transform record data to a human readable string
    const data = record.data;
    const decodedData = Buffer.from(data, 'base64');
    const ungziped = await node_gzip_1.ungzip(decodedData);
    console.log('ungziped: ' + ungziped);

    // Parse record data to JSON
    const dataJson = JSON.parse(ungziped.toString());

    // Get a list of log events and iterate through each element
    const logEventsList = dataJson.logEvents;
    logEventsList.forEach((logEventValue) => {
      // Get the message which was saved in CloudWatch
      const messageString = logEventValue.message;

      // Create the transformed result
      const transformedResultJson = {
        someRandomNumber: Math.random(), // Some random variable I decided to put in the result
        message: messageString + '-my-custom-change' // Edit the message
      };

      // Final data must be encoded to base 64
      const messageBase64 = Buffer.from(JSON.stringify(transformedResultJson) + '\n').toString('base64'); // Adding a new line to transformed result is optional. It just make reading the S3 easier
      console.log('messageBase64: ' + messageBase64);

      // Save transformed result
      result.push({
        recordId: recordId,
        result: 'Ok',
        data: messageBase64
      });
    });
  }

  // Replace initial records list with the transformed list
  event.records = result;
  console.log('new event: ' + JSON.stringify(event, undefined, 2));

  // Returned value will go back to kinesis firehose, then S3
  return event;
}
exports.handler = handler;

Lambda return value is:

{
  "invocationId": "000ac99...",
  "deliveryStreamArn": "arn:aws:firehose:eu-central-1:123456789123:deliverystream/delivery-09",
  "region": "eu-central-1",
  "records": [
    {
      "recordId": "496199814216613477...",
      "result": "Ok",
      "data": "eyJzb21lUmF..."
    },
    {
      "recordId": "4961998142166134...",
      "result": "Ok",
      "data": "eyJzb21lUmFuZG9..."
    }
  ]
}

You can also use a lambda blueprint kinesis-firehose-syslog-to-json.

Also see:

Upvotes: 5

Related Questions