Reputation: 41
In the below snippet of the consumer, I am able to receive the data sent. How do i access particular values from the entire data to work with.
from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import (SerializerError,
KeySerializerError,
ValueSerializerError)
***
***
***
c.subscribe(['Topic'])
while True:
try:
msg = c.poll(10)
print(msg)
Thanks
Upvotes: 0
Views: 429
Reputation: 39820
There are actually two ways you can achieve this:
msg.value()['myFieldName']
or
msg.value().get('myFieldName')
For instance,
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'localhost:8081',
'group.id': 'test-group'
})
c.subscribe(['Topic'])
while True:
try:
msg = c.poll(10)
if msg:
print(f"field1 Value: {msg.value()['field1']}")
print(f"field2 Value: {msg.value().get('field2')}")
else:
pass
except SerializerError as e:
print(f"Message deserialization failed for message {msg}:\n{e}")
Upvotes: 1
Reputation: 191743
I see you're importing the AvroConsumer, so you would have
c.value()['field']
Upvotes: 1