Reputation: 1647
I am exploring akka-stream-kafka
for one of my use case and going through this documentation. According to the documentation, the producer sink divides the payload i.e. data records into all the Kafka partitions equally, which is logical. However I wanted to have control over the partition where a message is going. My use case is, I will get millions of rows with key as record_id
, now I want to send all the records for the same record_id
lets assume 1234
to same partition lets assume partition number 10
. So to summarized it lets say I have 1000 records and 10 partitions. Out of those 1000 records 3700 are with record_id
1234
. Lets say kafka sent that record_id
to partition number 1. So I want all those 3700 records to go through partition 1 as I want to maintain the order of those records. Similarly for other record_id
. The plainsink
implementation that the documentation have divides the records evenly to all partition.
Is there a way I can control the record flow based on hashing of keys?
Upvotes: 0
Views: 89
Reputation: 9023
When you create the ProducerRecord
, you have the chance to provide a partition index where you want it to end up.
To calculate the partition index you can simply use recordId % numberOfPartitions
, and you will make sure all messages with the same recordId
will end up in the same partition.
Example below:
val source: Source[Record, NotUsed] = ???
source
.map { record =>
val partition = record.recordId % 10
new ProducerRecord[Array[Byte], Record]("topic1", partition, null, record)
}
.runWith(Producer.plainSink(producerSettings))
Upvotes: 1