Reputation: 2312
I am trying to grep through source code and all I am able to find is
/Users/myUser/.m2/repository/org/apache/kafka/kafka_2.10/0.9.0.2/kafka_2.10-0.9.0.2.jar!/kafka/javaapi/producer/Producer.class
public void send(List<KeyedMessage<K, V>> messages) {
this.underlying().send(scala.collection.JavaConversions..MODULE$.asScalaBuffer(messages).toSeq());
}
which calls
/Users/myUser/.m2/repository/org/apache/kafka/kafka_2.10/0.9.0.2/kafka_2.10-0.9.0.2.jar!/kafka/producer/Producer.class
public void send(Seq<KeyedMessage<K, V>> messages) {
synchronized(this.lock()) {
if(this.hasShutdown().get()) {
throw new ProducerClosedException();
} else {
this.recordStats(messages);
boolean var3 = this.sync();
BoxedUnit var4;
if(var3) {
this.eventHandler().handle(messages);
var4 = BoxedUnit.UNIT;
} else {
if(var3) {
throw new MatchError(BoxesRunTime.boxToBoolean(var3));
}
this.asyncSend(messages);
var4 = BoxedUnit.UNIT;
}
BoxedUnit var10000 = BoxedUnit.UNIT;
}
}
}
Its quite confusing because I am expecting both the send
function to accept partition id from default partition like
public void send(List<KeyedMessage<K, V>> messages, int partition id)
or somehow call the default partition to get the partition id in the send
method definition
But I don't see the partition id used anywhere.
Can someone point me a direction on where to look for where the partition is decided?
Is it before we call producer.send
?
Upvotes: 0
Views: 2133
Reputation: 191681
You're looking at the old API, that logic is in the EventHandler, which is called by the send method
For the newer clients, you can see it in the doSend method
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
The partitioner class is loaded from the properties given to the Producer. If not defined, it's the default
Upvotes: 3