Reputation: 457
I am trying to implement a custom Kafka Partitioner using spring cloud stream bindings. I would like to just custom Partition the user topic and not do anything with company topic(Kafka will use DefaultPartitioner in this case).
My bindings configuration:
spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
As per reference document: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#_partitioning_with_the_kafka_binder I modified the configuration to this:
spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
producer:
partitioned: true
partitionSelectorClass: config.UserPartitioner
I Post the message into Stream using this:
public void postUserStream(User user) throws ServiceException {
try {
LOG.info("Posting User {} into Kafka stream...", user);
MessageChannel messageChannel = messageStreams.outboundUser();
messageChannel
.send(MessageBuilder.withPayload(user)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
} catch (Exception ex) {
LOG.error("Error while populating User stream into Kafka.. ", ex);
throw ex;
}
}
My UserPartitioner Class:
public class UserPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {
String partitionKey = null;
if (Objects.nonNull(value)) {
User user = (User) value;
partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
keyBytes = partitionKey.getBytes();
}
return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
}
}
I end up receiving following exception:
Description: Failed to bind properties under 'spring.cloud.stream.bindings.user-out.producer' to org.springframework.cloud.stream.binder.ProducerProperties:
Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned
Action: Update your application's configuration
Any reference link on how to set up Custom Partition using message binders will be helpful.
Edit: Based on the documentation Tried the below steps as well:
user-out: destination: user contentType: application/json producer: partitionKeyExtractorClass: config.SimpleUserPartitioner
@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {
@Override
public Object extractKey(Message<?> message) {
if(message.getPayload() instanceof BaseUser) {
BaseUser user = (BaseUser) message.getPayload();
return user.getId();
}
return 10;
}
}
update 2: Solution that worked for me add partitioncount to bindings and autoaddpartitions to true in binder:
spring:
logging:
level: info
cloud:
stream:
bindings:
user-out:
destination: user
contentType: application/json
producer:
partition-key-expression: headers['partitionKey']
partition-count: 4
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true
Upvotes: 3
Views: 5270
Reputation: 6106
Also, note; The partitionSelectorClass
. has been deprecated for a while now and have been removed in the current master (won't be available in 3.0.0) in favor of partitionSelectorName
- https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream.html#spring-cloud-stream-overview-partitioning
Upvotes: 0
Reputation: 174554
There is no property partitioned
; the getter depends on other properties...
public boolean isPartitioned() {
return this.partitionKeyExpression != null
|| this.partitionKeyExtractorName != null;
}
partitionSelectorClass: config.UserPartitioner
The UserPartitioner
is a Kafka Partitioner
- it determines which consumers get which partitions (on the consumer side)
The partitionSelectorClass
has to be a PartitionSelectorStrategy
- it determines which partition a record is sent to (on the producer side).
These are completely different objects.
If you really want to customize the way partitions are distributed across consumer instances, that is a Kafka concern and has nothing to do with Spring.
Furthermore, all consumer bindings in the same binder will use the same Partitioner
. You would have to configure multiple binders to have different Partitioner
s.
Given your question, I think you are simply confusing Partitioner
with PartitionSelectorStrategy
and you need the latter.
Upvotes: 2