Reputation: 2620
I have millions of records each with unique identifier.
All the records are categorized by series number, let's say 10k records belong to series-1, another 10k to series-2 and so on..
Now I want to publish the all series-1 records to partition-1, all series-2 to partition-2 and so on..
To achieve this I don't want to use the message key, is there any other alternative?
I am new to kafka, Please correct me if the question is wrong or not have proper details?
Upvotes: 1
Views: 970
Reputation: 3832
You can use the below methods to publish a message on a specific partition
Simple Kafka Producer
/**
Creates a record to be sent to a specified topic and partition
**/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
}
Basic example to publish a message on the partition
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, <bootstrap server detail>);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//Define partition based on condition series-1 to 2 series-2 to 2
int partition=getPartitionOnCondition.....
String topic=""..
Producer<String, String> producer = new KafkaProducer<String,String>(
properties);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, partition, key, value);
producer.send(record);
Custom Partitioner
You can also use a custom partitioner for a producer or Stream partitioner
https://kafka.apache.org/documentation.html
Custom Stream Partitioner(In case you are using Kafka Stream)
If you are using Kafka Stream. It also provides a way to enable Custom Partitioner around Kafka Stream https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/processor/StreamPartitioner.html
Upvotes: 1
Reputation: 3173
Its better to create a Custom Partitioner
class for your producer application.
For every record you publish with the Producer application, given custom partitioner's partition()
method will be invoked with the messages key&value. There you may write your logic to parse the field to determine the partition number to which message should be written.
Create a custom partitioner class similar to below,
public class CustomPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Integer partitionNumber = null;
if (valueBytes != null) {
String valueStr = new String(valueBytes);
/* code to extract the partition number from value*/
/*assign partitionNumber decided based on the value*/
}
return partitionNumber;
}
}
Assign the partitioner in your Producer class and start publishing the messages
props.put("partitioner.class", "com.example.CustomPartitioner");
Upvotes: 0