Reputation: 103
I wrote a python script:
#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer('dimon_tcpdump',group_id='zhg_group',bootstrap_servers='192.168.100.9:9092')
for msg in consumer:
print msg
# process mes here
the msg output is like:
ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')
I know the output is a namedtuple form.
My problem is: how can I get a specific field of the ConsumerRecord
?
For example, I want to assign the value string to a variable.
Upvotes: 9
Views: 14170
Reputation: 863
As you know your msg
is of type namedtuple
, you can access its fields simply by attribute lookup, e.g:
for msg in consumer:
value_to_process = msg.value
In the comment section, it seems that you were re-assigning the output string of msg
to msg
by:
msg = r'''ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')'''
That was why you were getting AttributeError
: at this point msg
is overwritten and becomes a str
object, but str
does not have value
attribute.
Upvotes: 3
Reputation: 962
It might have to do with how your deserializing the data. For example, if you wanted to grab some JSON from the msg
. You would initialize the Consumer
with:
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
So your code would look something like this:
#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'dimon_tcpdump',
group_id='zhg_group',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
bootstrap_servers='192.168.100.9:9092'
)
for msg in consumer:
print msg.value
# process mes here
Upvotes: 6