johnhckuo
johnhckuo

Reputation: 365

How do I know which partition to consume in Kafka?

I have one topic with 3 partitions in Kafka broker 1 partitions have message and 2 partitions are empty, how do I know which partition to consume in one call?

First I assign a TopicPartition with partition equals to kafka.PartitionAny, but this value keeps returning -1

So I have to manually use a counter, and when I successfully consume from one partition but with null message, then count++ and start with next one, until I find the message


for{

    partitions = append(partitions, kafka.TopicPartition{
        Topic:     &topic,
        Partition: partition,
        Offset:    offSet,
        Error:     err,
    })


    err = c.Assign(partitions)
    if err != nil {
         return err
    }

    // retrieve message
    ev, err := c.Poll(-1)
    if err != nil {
         return err
    }

    // if no message, check the next partition
    if ev == nil{
         partition++
    }else{
         break
    }

}

The first two round does not return any message, but it has to wait for the third round to return, is there any way it can automatically detect which partition is stored with un-consumed message?

If there is no other way, can Kafka do the round-robin routing for me? or I have to record the counter myself

Thanks! :)

Upvotes: 0

Views: 1297

Answers (1)

Yannick
Yannick

Reputation: 1418

You should definitely use subscribe() method and then call poll(). If there are available records, you'll get a Records answer containing one or multiple records ( associated with different partitions).

After processing a record, you can then commit manually ( if you use enable.auto.commit = false) using the medata contained in the record ( topic, partitions, offset, etc..).

Yannick

Upvotes: 1

Related Questions