Reputation: 3050
For my expolartion , created AWS firehose stream and configured Lambda function and move the data to S3. Firehose to S3 it is working fine without any issue. If i enable lamda function, getting below error in S3 failed bucket.
{"attemptsMade":4,"arrivalTimestamp":1570727830210,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."
lambda java code:
public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, String> {
@Override
public String handleRequest(KinesisEvent event, Context context) {
context.getLogger().log("Input: " + event);
StringBuffer sb = new StringBuffer();
for (KinesisEventRecord record : event.getRecords()) {
String payload = new String(record.getKinesis().getData().array());
if (payload.toLowerCase().contains("scala"))
sb.append(payload);
sb.append("\n");
}
return sb.toString();
}
}
Basically , filter incoming streaming data and push to S3. Also I have frw questions. 1 . i am passing punch JSON data to firehose. "record.getKinesis().getData()" method will read the record line by line and bunch to entire json string. 2. written log statment. where to check my log. How can i handle this scenario? please advise
Upvotes: 0
Views: 2011
Reputation: 17435
The AWS Lambda Java Events 2.x library has support for the KinesisFirehoseEvent. The 1.x library did not have this class.
Your code would look something like:
public class LambdaFunctionHandler implements RequestHandler<KinesisFirehoseEvent, String> {
@Override
public String handleRequest(KinesisFirehoseEvent event, Context context) {
}
}
From the Lambda test environment, the event will come in looking like:
{
"invocationId": "invocationIdExample",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"region": "us-west-2",
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
}
]
}
Upvotes: 1