Reputation: 23
I am observing a peculiar issue.
I have built a Lambda function in Java 8; which polls a MSK topic using Kafka consumer api i.e. consumer.poll(5000) (I have tried with various timeout). There is a second Lambda function which is producer and sending messages to same topic. Both functions are attached to VPC in which MSK is in.
The producer working well. I can see the messages b running Kafka console consumer from an EC2.
But the consumer Lambda does not work, it just gives timed out.
Only when I am running lambda producer, Kafka console consumer on the EC2 and Lambda consumer simultaneously, the consumer is getting some message !! To be precise, the producer sending 5 messages in a loop, the EC2 console consumer shows all 5, but the lambda consumer is showing 3rd or 4th message.
Why is this happening? What might be issue here, and How can I get the messages consistently in lambda consumer?
If anybody has a working code sample, I would be very grateful.
Thank you.
Further update: I have scheduled the consumer function, and then it gets all the events. I still have following questions - 1> Why is it not getting the messages when I manually trigger the function? 2> I tested a consumer function written in Python; that is not getting any messages too. Here is the python code:
def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'
consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)
for message in consumer:
consumer.commit()
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
KafkaConsumer.close()
return ("Processed")
Code of the Java consumer; it is a plain Kafka client. From Lambda this is able to read messages when run in scheduled manner; but not when manually tested.
try {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
context.getLogger().log("Subscribed to topic " + topicName);
int i = 0;
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records) {
context.getLogger().log("Message::: offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
}
context.getLogger().log("After messages");
} catch (Exception e) {
e.printStackTrace();
context.getLogger().log("Exception: "+e.getMessage());
}
Upvotes: 0
Views: 1537
Reputation: 139
if only you'd have held on for another month or so! AWS now supports kafka event sources from both MSK and self-managed kafka clusters (even outside of AWS)
https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
Upvotes: 1