Reputation: 225
I wrote a simple handler function to explore the event object produced by kafka, deployed it and queued a few kafka messages.
def handler(context, event):
try:
print(f"Variables inside the event object: {vars(event)}")
print(f"Attributes of event object: {dir(event)}")
except Exception as e:
print(f"Exception {e} occured")
In my pod logs, everytime a kafka event was queued, this was the output:
Variables inside the event object: {'body': b'{"value0":"value","value1":[1,2,3]}', 'content_type': b'', 'trigger': <nuclio_sdk.event.TriggerInfo object at 0x7f928c5c3040>, 'fields': {}, 'headers': {}, 'id': b'd6db95ca-0659-490b-b8f9-4f7dc4b09b70', 'method': b'', 'path': b'event_object_topic', 'size': 35, 'timestamp': datetime.datetime(2024, 3, 8, 8, 0, 14), 'url': b'', 'shard_id': 0, 'num_shards': 0, 'type': b'', 'type_version': b'', 'version': b'', 'last_in_batch': False, 'offset': 0}
Attributes of event object: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'deserialize', 'fields', 'from_json', 'from_msgpack', 'get_header', 'headers', 'id', 'last_in_batch', 'method', 'num_shards', 'offset', 'path', 'shard_id', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
{"level":"debug","time":"2024-03-08T08:00:14.940Z","name":"processor.kafka-cluster.epic-others-event-loader.sarama","message":"Sarama: client/metadata fetching metadata for [event_object_topic] from broker vmwxvsapp05-xxxxxxxxxxxx:9092"}
Variables inside the event object: {'body': b'{"value0":"value","value1":[1,2,3]}', 'content_type': b'', 'trigger': <nuclio_sdk.event.TriggerInfo object at 0x7f6169cfe040>, 'fields': {}, 'headers': {}, 'id': b'cd8cde08-b70f-4f34-abee-c936c3e6acf9', 'method': b'', 'path': b'event_object_topic', 'size': 35, 'timestamp': datetime.datetime(2024, 3, 8, 8, 0, 15), 'url': b'', 'shard_id': 0, 'num_shards': 0, 'type': b'', 'type_version': b'', 'version': b'', 'last_in_batch': False, 'offset': 0}
Attributes of event object: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'deserialize', 'fields', 'from_json', 'from_msgpack', 'get_header', 'headers', 'id', 'last_in_batch', 'method', 'num_shards', 'offset', 'path', 'shard_id', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
{"level":"debug","time":"2024-03-08T08:00:15.938Z","name":"processor.kafka-cluster.epic-others-event-loader.sarama","message":"Sarama: client/metadata fetching metadata for [event_object_topic] from broker vmwxvsapp05-xxxxxxxxxxxx:9092"}
Variables inside the event object: {'body': b'{"value0":"value","value1":[1,2,3]}', 'content_type': b'', 'trigger': <nuclio_sdk.event.TriggerInfo object at 0x7f928c5c3460>, 'fields': {}, 'headers': {}, 'id': b'e83bd16c-1d3c-490a-b3f1-187ae99617f8', 'method': b'', 'path': b'event_object_topic', 'size': 35, 'timestamp': datetime.datetime(2024, 3, 8, 8, 0, 18), 'url': b'', 'shard_id': 0, 'num_shards': 0, 'type': b'', 'type_version': b'', 'version': b'', 'last_in_batch': False, 'offset': 0}
Attributes of event object: ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'body', 'content_type', 'deserialize', 'fields', 'from_json', 'from_msgpack', 'get_header', 'headers', 'id', 'last_in_batch', 'method', 'num_shards', 'offset', 'path', 'shard_id', 'size', 'timestamp', 'to_json', 'trigger', 'type', 'type_version', 'url', 'version']
As seen in the output the offset is always 0. However in Control Center, and in the topic in general each message is a different offset.
Why is this happening?
Upvotes: 1
Views: 24