hokiegeek2
hokiegeek2

Reputation: 303

Kafka client poll() throws EOFError after each received message

I am using a slight variant of the example client polling code I found in the Confluent github:

c = Consumer({'bootstrap.servers':'localhost:9092','group.id':'devops','auto.offset.reset':'earliest'})
c.subscribe(['system-diskio-write-bytes','system-cpu-user-pct'])

try:
    while True:
        msg = c.poll(timeout=1000.0)
        if msg is None:
            continue
        if msg.error():
            print(msg.error())
        else:
            print('topic: %s key: %s value: %s' % (msg.topic(), msg.key(), msg.value()))

except KeyboardInterrupt:
    print('Polling interrupted by consumer')

The EOF KafkaError is raised after every received message:

topic: system-diskio-write-bytes key: None value: b'{"route" : "system-diskio-write-bytes", "timestamp" : 2019-03-06T13:46:25.244, "value" : 655002980352.0}'
KafkaError{code=_PARTITION_EOF,val=-191,str="Broker: No more messages"}

I don't understand why this happening--any ideas as to why this error is being thrown how to fix? Any ideas are much appreciated--thanks!

Upvotes: 1

Views: 4965

Answers (1)

hokiegeek2
hokiegeek2

Reputation: 303

Okay, the answer is detailed here. The explanation is as follows:

An EOF event will be pushed on the internal message queue (served by poll()) each time the consumer hits a fresh end offset, regardless if your application does not call poll() for 10 seconds.

The EOF event can be disabled as follows:

enable.partition.eof=false

Upvotes: 1

Related Questions