Reputation: 21
We have an API that connects to multiple kafka topics to produce and consume kafka messages. We use spring-kafka and apart from the topic name (which can be provided at runtime for both consumers and producers), we have another property that is different per topic (the encryption key for serializing/deserializing kafka messages) which raise the issue that we cannot reuse the same kafka producer/ kafka consumer configuration for all topics.
The initial approach that we came with is:
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties properties;
@Bean(name = TOPIC_1_PRODUCER)
public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic1() {
Map<String, Object> producerProps = properties.buildProducerProperties();
producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_1");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
@Bean(name = TOPIC_2_PRODUCER)
public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic2() {
Map<String, Object> producerProps = properties.buildProducerProperties();
producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_2");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
@Bean(name = TOPIC_3_PRODUCER)
public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic3() {
Map<String, Object> producerProps = properties.buildProducerProperties();
producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_3");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
}
With this approach, we will need a new KafkaTemplate bean for every new topic where we need to produce messages, so basically one kafka template per topic and the associated kafkaTemplate bean will be injected in the service designed to produce the kafka messages for that particular topic.
The 2nd approach would be to use only one KafkaTemplate bean (one kafka producer) and provide the encryption key at runtime. For this one I had a look at Any possibility that I can provide configuration details at run time to producer and consumer in spring-kafka? and came with the below implementation:
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties properties;
@Bean
public ProducerFactory<String, GenericRecord> producerFactory() {
Map<String, Object> producerProps = properties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProps);
}
}
And the publisher service:
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final ProducerFactory<String, GenericRecord> producerFactory;
private final TopicResolver topicResolver;
public void sendMessage(GenericRecord genericRecord) {
TopicInfo topicInfo = topicResolver.resolve(genericRecord);
Map<String, Object> configOverrides = new HashMap<>();
configOverrides.put(ENCRYPTION_KEY, "encryption_key_for_resolved_topic");
KafkaTemplate<String, GenericRecord> kafkaTemplate = new KafkaTemplate<>(producerFactory, configOverrides);
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(topicInfo.getTopicName(), genericRecord);
ListenableFuture<SendResult<String, GenericRecord>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new KafkaSendCallback<String, GenericRecord>() {
@Override
public void onSuccess(SendResult<String, GenericRecord> result) {
log.info("Kafka message successfully sent");
}
@Override
public void onFailure(@NonNull KafkaProducerException ex) {
log.error("Kafka message failed to be sent with error {}", ex.getMessage());
}
});
}
}
By using this approach, a new KafkaTemplate instance will be created for each message produced, which I guess is not framework intended and I don't know how this could affect kafka performance overall.
What is your view between the 2 approaches presented? Is there any other possible solution to provide the configuration at runtime while using one KakaTemplate bean/one kafka producer?
We have the same need on the consumer side.
Upvotes: 0
Views: 1400
Reputation: 174689
You can use a single KafkaTemplate
configured with a DelegatingByTopicSerializer
.
https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-topic
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
new IntegerSerializer(),
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
Upvotes: 0