Learn Hadoop
Learn Hadoop

Reputation: 3050

AWS Kinesis Firehose to Lambda , Lambda to S3 using java

For my expolartion , created AWS firehose stream and configured Lambda function and move the data to S3. Firehose to S3 it is working fine without any issue. If i enable lamda function, getting below error in S3 failed bucket.

{"attemptsMade":4,"arrivalTimestamp":1570727830210,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."

lambda java code:

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, String> {

    @Override
    public String handleRequest(KinesisEvent event, Context context) {
        context.getLogger().log("Input: " + event);
        StringBuffer sb = new StringBuffer();
        for (KinesisEventRecord record : event.getRecords()) {
            String payload = new String(record.getKinesis().getData().array());
            if (payload.toLowerCase().contains("scala"))
                sb.append(payload);
            sb.append("\n");
        }

        return sb.toString();
    }
}

Basically , filter incoming streaming data and push to S3. Also I have frw questions. 1 . i am passing punch JSON data to firehose. "record.getKinesis().getData()" method will read the record line by line and bunch to entire json string. 2. written log statment. where to check my log. How can i handle this scenario? please advise

Upvotes: 0

Views: 2011

Answers (1)

stdunbar
stdunbar

Reputation: 17435

The AWS Lambda Java Events 2.x library has support for the KinesisFirehoseEvent. The 1.x library did not have this class.

Your code would look something like:

public class LambdaFunctionHandler implements RequestHandler<KinesisFirehoseEvent, String> {

    @Override
    public String handleRequest(KinesisFirehoseEvent event, Context context) {
    }
}

From the Lambda test environment, the event will come in looking like:

{
  "invocationId": "invocationIdExample",
  "deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
  "region": "us-west-2",
  "records": [
    {
      "recordId": "49546986683135544286507457936321625675700192471156785154",
      "approximateArrivalTimestamp": 1495072949453,
      "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
    }
  ]
}

Upvotes: 1

Related Questions