Tristan Tran
Tristan Tran

Reputation: 1533

TypeError: partitions must be TopicPartition namedtuples

I want to use KafkaConsumer from kafka-python to consumer the first N messages in a topic:

from kafka import KafkaConsumer as kc
import json

bootstrap_servers = ['xx.xxx.xx.xxx:9092']
topic_name = 'my_topic_name'

consumer = kc(topic_name, group_id='group1', bootstrap_servers=bootstrap_servers,
              auto_offset_reset='earliest', auto_commit_interval_ms=1000,
              enable_auto_commit=True,
              value_deserializer=lambda x: json.loads(x.decode('utf-8')))
count = 0
consumer.seek_to_beginning((topic_name,0))
kjson = []
for msg in consumer:
    if count < 10:
        count = count + 1
        kjson.append(msg.value)
    else:
        print(json.dumps(kjson, indent=4))
        break

This line consumer.seek_to_beginning((topic_name,0)) gives me the above error. The documentation specifies:

seek_to_beginning(*partitions)[source]
Seek to the oldest available offset for partitions.

Parameters: *partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions.
Raises: AssertionError – If any partition is not currently assigned, or if no partitions are assigned.

There are 32 partitions in this topic (indexed from 0 to 31). What would be the right syntax to start from the very beginning?

Upvotes: 0

Views: 1907

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191874

As it says

must be TopicPartition namedtuples

e.g.

from kafka.structs import TopicPartition


...
consumer.seek_to_beginning(TopicPartition(topic_name,0))

There are 32 partitions in this topic (indexed from 0 to 31)

tps = [TopicPartition(topic_name, i) for i in range(32)]
consumer.seek_to_beginning(tps)

Upvotes: 2

Related Questions