zhenghuagui
zhenghuagui

Reputation: 103

how to fetch a field in ConsumerRecord

I wrote a python script:

#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer('dimon_tcpdump',group_id='zhg_group',bootstrap_servers='192.168.100.9:9092')
for msg in consumer:
    print msg
    # process mes here

the msg output is like:

ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')

I know the output is a namedtuple form.

My problem is: how can I get a specific field of the ConsumerRecord? For example, I want to assign the value string to a variable.

Upvotes: 9

Views: 14170

Answers (2)

EasonL
EasonL

Reputation: 863

As you know your msg is of type namedtuple, you can access its fields simply by attribute lookup, e.g:

for msg in consumer:
    value_to_process = msg.value

In the comment section, it seems that you were re-assigning the output string of msg to msg by:

msg = r'''ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')'''

That was why you were getting AttributeError: at this point msg is overwritten and becomes a str object, but str does not have value attribute.

Upvotes: 3

Dominic Cabral
Dominic Cabral

Reputation: 962

It might have to do with how your deserializing the data. For example, if you wanted to grab some JSON from the msg. You would initialize the Consumer with:

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

So your code would look something like this:

#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
   'dimon_tcpdump',
    group_id='zhg_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    bootstrap_servers='192.168.100.9:9092'
    )
for msg in consumer:
    print msg.value
    # process mes here

Upvotes: 6

Related Questions