fahadhub
fahadhub

Reputation: 225

Why is the event offset always 0 for my kafka triggered nuclio function

I wrote a simple handler function to explore the event object produced by kafka, deployed it and queued a few kafka messages.

def handler(context, event):
    try:
        print(f"Variables inside the event object: {vars(event)}")
        print(f"Attributes of event object: {dir(event)}")
    except Exception as e:
        print(f"Exception {e} occured")

In my pod logs, everytime a kafka event was queued, this was the output:

Variables inside the event object: {'body': b'{"value0":"value","value1":[1,2,3]}', 'content_type': b'', 'trigger': <nuclio_sdk.event.TriggerInfo object at 0x7f928c5c3040>, 'fields': {}, 'headers': {}, 'id': b'd6db95ca-0659-490b-b8f9-4f7dc4b09b70', 'method': b'', 'path': b'event_object_topic', 'size': 35, 'timestamp': datetime.datetime(2024, 3, 8, 8, 0, 14), 'url': b'', 'shard_id': 0, 'num_shards': 0, 'type': b'', 'type_version': b'', 'version': b'', 'last_in_batch': False, 'offset': 0}
Attributes of event object: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'deserialize', 'fields', 'from_json', 'from_msgpack', 'get_header', 'headers', 'id', 'last_in_batch', 'method', 'num_shards', 'offset', 'path', 'shard_id', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']

{"level":"debug","time":"2024-03-08T08:00:14.940Z","name":"processor.kafka-cluster.epic-others-event-loader.sarama","message":"Sarama: client/metadata fetching metadata for [event_object_topic] from broker vmwxvsapp05-xxxxxxxxxxxx:9092"}

Variables inside the event object: {'body': b'{"value0":"value","value1":[1,2,3]}', 'content_type': b'', 'trigger': <nuclio_sdk.event.TriggerInfo object at 0x7f6169cfe040>, 'fields': {}, 'headers': {}, 'id': b'cd8cde08-b70f-4f34-abee-c936c3e6acf9', 'method': b'', 'path': b'event_object_topic', 'size': 35, 'timestamp': datetime.datetime(2024, 3, 8, 8, 0, 15), 'url': b'', 'shard_id': 0, 'num_shards': 0, 'type': b'', 'type_version': b'', 'version': b'', 'last_in_batch': False, 'offset': 0}
Attributes of event object: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'deserialize', 'fields', 'from_json', 'from_msgpack', 'get_header', 'headers', 'id', 'last_in_batch', 'method', 'num_shards', 'offset', 'path', 'shard_id', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']

{"level":"debug","time":"2024-03-08T08:00:15.938Z","name":"processor.kafka-cluster.epic-others-event-loader.sarama","message":"Sarama: client/metadata fetching metadata for [event_object_topic] from broker vmwxvsapp05-xxxxxxxxxxxx:9092"}

Variables inside the event object: {'body': b'{"value0":"value","value1":[1,2,3]}', 'content_type': b'', 'trigger': <nuclio_sdk.event.TriggerInfo object at 0x7f928c5c3460>, 'fields': {}, 'headers': {}, 'id': b'e83bd16c-1d3c-490a-b3f1-187ae99617f8', 'method': b'', 'path': b'event_object_topic', 'size': 35, 'timestamp': datetime.datetime(2024, 3, 8, 8, 0, 18), 'url': b'', 'shard_id': 0, 'num_shards': 0, 'type': b'', 'type_version': b'', 'version': b'', 'last_in_batch': False, 'offset': 0}
Attributes of event object: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'deserialize', 'fields', 'from_json', 'from_msgpack', 'get_header', 'headers', 'id', 'last_in_batch', 'method', 'num_shards', 'offset', 'path', 'shard_id', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']

As seen in the output the offset is always 0. However in Control Center, and in the topic in general each message is a different offset.

Why is this happening?

Upvotes: 1

Views: 24

Answers (0)

Related Questions