Jal
Jal

Reputation: 2312

Where exactly is the default partition decided in kafka producer?

I am trying to grep through source code and all I am able to find is

/Users/myUser/.m2/repository/org/apache/kafka/kafka_2.10/0.9.0.2/kafka_2.10-0.9.0.2.jar!/kafka/javaapi/producer/Producer.class

public void send(List<KeyedMessage<K, V>> messages) {
   this.underlying().send(scala.collection.JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());
}

which calls

/Users/myUser/.m2/repository/org/apache/kafka/kafka_2.10/0.9.0.2/kafka_2.10-0.9.0.2.jar!/kafka/producer/Producer.class

public void send(Seq<KeyedMessage<K, V>> messages) {
    synchronized(this.lock()) {
        if(this.hasShutdown().get()) {
            throw new ProducerClosedException();
        } else {
            this.recordStats(messages);
            boolean var3 = this.sync();
            BoxedUnit var4;
            if(var3) {
                this.eventHandler().handle(messages);
                var4 = BoxedUnit.UNIT;
            } else {
                if(var3) {
                    throw new MatchError(BoxesRunTime.boxToBoolean(var3));
                }

                this.asyncSend(messages);
                var4 = BoxedUnit.UNIT;
            }

            BoxedUnit var10000 = BoxedUnit.UNIT;
        }
    }
}

Its quite confusing because I am expecting both the send function to accept partition id from default partition like

public void send(List<KeyedMessage<K, V>> messages, int partition id)

or somehow call the default partition to get the partition id in the send method definition

But I don't see the partition id used anywhere.

Can someone point me a direction on where to look for where the partition is decided?

Is it before we call producer.send?

Upvotes: 0

Views: 2133

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

You're looking at the old API, that logic is in the EventHandler, which is called by the send method


For the newer clients, you can see it in the doSend method

 int partition = partition(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);

The partitioner class is loaded from the properties given to the Producer. If not defined, it's the default

Upvotes: 3

Related Questions