Reputation: 349
I have a use case where i need to write a custom logic to assign the partition based on certain key parameters from the message. I did some research on this and found that kafka transformation supports to override some methods in Transformation interface but i was unable to do some example code in git hub or somewhere. can someone please share the sample code or git hub link to do the custom partition assignment in kafka JDBC source connector?
Thanks in advance!.
Upvotes: 3
Views: 1635
Reputation: 6593
Kafka Connect to assigning partitions by default uses: DefaultPartitioner
(org.apache.kafka.clients.producer.internals.DefaultPartitioner
)
If you need to override default one with some custom, it is possible, but you have to remember, that overriding applies to all Source Connectors.
To do that you have to set producer.partitioner.class
property, ex producer.partitioner.class=com.example.CustomPartitioner
.
Additionally you have to copy jar with your partitioner to directory with Kafka Connect libraries.
Transformation way:
Setting the partition is also possible in Transformation, but it is not proper approach.
From Transformation
you don't have access to topic metadata, that are crucial for assigning partitions.
If anyway you would like to set partitions for your records, code should look like this:
public class AddPartition <R extends ConnectRecord<R>> implements Transformation<R> {
public static final ConfigDef CONFIG_DEF = new ConfigDef();
@Override
public void configure(Map<String, ?> props) {
final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props);
}
@Override
public R apply(R record) {
return record.newRecord(record.topic(), calculatePartition(record), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp());
}
private Integer calculatePartition(R record) {
// Partitions calcuation based on record information
return 0;
}
@Override
public void close() {
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
}
Upvotes: 2