user1477232
user1477232

Reputation: 457

How to implement custom kafka Partition using spring cloud stream

I am trying to implement a custom Kafka Partitioner using spring cloud stream bindings. I would like to just custom Partition the user topic and not do anything with company topic(Kafka will use DefaultPartitioner in this case).

My bindings configuration:

spring:
  cloud:
    stream:
      bindings:
        comp-out:
          destination: company
          contentType: application/json
        user-out:
          destination: user
          contentType: application/json

As per reference document: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#_partitioning_with_the_kafka_binder I modified the configuration to this:

spring:
  cloud:
    stream:
      bindings:
        comp-out:
          destination: company
          contentType: application/json
        user-out:
          destination: user
          contentType: application/json
		      producer:
            partitioned: true
            partitionSelectorClass: config.UserPartitioner

I Post the message into Stream using this:

public void postUserStream(User user) throws ServiceException {
        try {
            LOG.info("Posting User {} into Kafka stream...", user);
            MessageChannel messageChannel = messageStreams.outboundUser();
            messageChannel
                    .send(MessageBuilder.withPayload(user)
                            .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
        } catch (Exception ex) {
            LOG.error("Error while populating User stream into Kafka.. ", ex);
            throw ex;
        }
    }

My UserPartitioner Class:

public class UserPartitioner extends DefaultPartitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
          Cluster cluster) {

        String partitionKey = null;
        if (Objects.nonNull(value)) {
            User user = (User) value;
            partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
            keyBytes = partitionKey.getBytes();
        }
        return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
    }
}

I end up receiving following exception:

Description: Failed to bind properties under 'spring.cloud.stream.bindings.user-out.producer' to org.springframework.cloud.stream.binder.ProducerProperties:

Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned

Action: Update your application's configuration

Any reference link on how to set up Custom Partition using message binders will be helpful.

Edit: Based on the documentation Tried the below steps as well:

user-out: destination: user contentType: application/json producer: partitionKeyExtractorClass: config.SimpleUserPartitioner

@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {

	@Override
	public Object extractKey(Message<?> message) {
		if(message.getPayload() instanceof BaseUser) {
			BaseUser user = (BaseUser) message.getPayload();
			return user.getId();
		}
		return 10;
	}
    
}

update 2: Solution that worked for me add partitioncount to bindings and autoaddpartitions to true in binder:

spring:
  logging:
    level: info
  cloud:
    stream:
      bindings:
        user-out:
          destination: user
          contentType: application/json
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 4

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true

Upvotes: 3

Views: 5270

Answers (2)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6106

Also, note; The partitionSelectorClass . has been deprecated for a while now and have been removed in the current master (won't be available in 3.0.0) in favor of partitionSelectorName - https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.0.M1/spring-cloud-stream.html#spring-cloud-stream-overview-partitioning

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174554

There is no property partitioned; the getter depends on other properties...

public boolean isPartitioned() {
    return this.partitionKeyExpression != null
            || this.partitionKeyExtractorName != null;
}

partitionSelectorClass: config.UserPartitioner

The UserPartitioner is a Kafka Partitioner - it determines which consumers get which partitions (on the consumer side)

The partitionSelectorClass has to be a PartitionSelectorStrategy - it determines which partition a record is sent to (on the producer side).

These are completely different objects.

If you really want to customize the way partitions are distributed across consumer instances, that is a Kafka concern and has nothing to do with Spring.

Furthermore, all consumer bindings in the same binder will use the same Partitioner. You would have to configure multiple binders to have different Partitioners.

Given your question, I think you are simply confusing Partitioner with PartitionSelectorStrategy and you need the latter.

Upvotes: 2

Related Questions