sdas
sdas

Reputation: 23

Problem in Lambda for Java consumer listening to MSK topic

I am observing a peculiar issue. I have built a Lambda function in Java 8; which polls a MSK topic using Kafka consumer api i.e. consumer.poll(5000) (I have tried with various timeout). There is a second Lambda function which is producer and sending messages to same topic. Both functions are attached to VPC in which MSK is in.
The producer working well. I can see the messages b running Kafka console consumer from an EC2. But the consumer Lambda does not work, it just gives timed out.

Only when I am running lambda producer, Kafka console consumer on the EC2 and Lambda consumer simultaneously, the consumer is getting some message !! To be precise, the producer sending 5 messages in a loop, the EC2 console consumer shows all 5, but the lambda consumer is showing 3rd or 4th message.

Why is this happening? What might be issue here, and How can I get the messages consistently in lambda consumer?

If anybody has a working code sample, I would be very grateful.

Thank you.


Further update: I have scheduled the consumer function, and then it gets all the events. I still have following questions - 1> Why is it not getting the messages when I manually trigger the function? 2> I tested a consumer function written in Python; that is not getting any messages too. Here is the python code:

def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'

consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)

for message in consumer:
      consumer.commit()
      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
KafkaConsumer.close()         
return ("Processed")

Code of the Java consumer; it is a plain Kafka client. From Lambda this is able to read messages when run in scheduled manner; but not when manually tested.

    try {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topicName));

        context.getLogger().log("Subscribed to topic " + topicName);
        
        int i = 0;
        ConsumerRecords<String, String> records = consumer.poll(5000);          
        
        for (ConsumerRecord<String, String> record : records) {
            context.getLogger().log("Message:::  offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
        }
        context.getLogger().log("After messages");
    } catch (Exception e) {
        e.printStackTrace();
        context.getLogger().log("Exception: "+e.getMessage());
    }

Upvotes: 0

Views: 1537

Answers (1)

Misha Bruml
Misha Bruml

Reputation: 139

if only you'd have held on for another month or so! AWS now supports kafka event sources from both MSK and self-managed kafka clusters (even outside of AWS)

https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

https://aws.amazon.com/about-aws/whats-new/2020/12/aws-lambda-now-supports-self-managed-apache-kafka-as-an-event-source/

Upvotes: 1

Related Questions