Merrin
Merrin

Reputation: 675

Is there a code sample for multiple producers in spring kafka?

I have an application that may need multiple producers. All code samples I see seem to support a single producer, reading config from app during app startup. If there are multiple producers and we want to pass in different producer config, is there out of the box support in Spring? Or should I just go without spring in that case?

Upvotes: 15

Views: 23250

Answers (5)

Ashish Lahoti
Ashish Lahoti

Reputation: 978

Spring boot doesn't provide out of the box support for multiple producer configuration. You can write your own custom kafka configuration to support multiple producer config something like this:-

kafka:
  producer:
    producer1:
      topic: topic1
      bootstrap-servers: server1:9092,server1:9093,server1:9094
      retries: 0
      acks: all
    producer2:
      topic: topic2
      bootstrap-servers: server2:9092,server2:9093,server2:9094
      retries: 2
      acks: 1
    producer3:
      ...
    producer4:
      ...

Read the configuration from class file:-

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

use this configuration to generate KafkaTemplate beans for each producer using @Qualifier annotation

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {

  private final KafkaCustomProperties kafkaCustomProperties;

  @Bean
  @Qualifier("producer1")
  public KafkaTemplate<String, Object> producer1KafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer1"));
  }

  @Bean
  @Qualifier("producer2")
  public KafkaTemplate<String, Object> producer2KafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer2"));
  }

  private ProducerFactory<String, Object> producerFactory(String producerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getProducer())) {
      KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
      if (nonNull(producerProperties)) {
        properties.putAll(producerProperties.buildProperties());
      }
    }
    log.info("Kafka Producer '{}' properties: {}", producerName, properties);
    return new DefaultKafkaProducerFactory<>(properties);
  }
}

and use these KafkaTemplate beans to publish message to different producer config.

Refer to the post https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/ for detailed explanation.

Upvotes: 2

andreoss
andreoss

Reputation: 1800

If you still want to keep your configuration in application.yaml as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer.


@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
    private final KafkaProperties common;

    @Qualifier("producer-1")
    @Bean
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("producer-1")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}

@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
    private final KafkaProperties common;

    @Qualifier("producer-2")
    @Bean
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("producer-2")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}

Upvotes: 9

vinilpj
vinilpj

Reputation: 178

Starting with version 2.5, you can use a RoutingKafkaTemplate to select the producer at runtime, based on the destination topic name. https://docs.spring.io/spring-kafka/reference/html/#routing-template

Upvotes: 4

Girdhar Singh Rathore
Girdhar Singh Rathore

Reputation: 5585

you will have to create two different ProducerFactory below is example

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;

    @Configuration
    public class KafkaProducerConfig {


        @Bean
        public ProducerFactory<String, String> confluentProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9092");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }


        @Bean
        public ProducerFactory<String, String> cloudraProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9094");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean(name = "confluent")
        public KafkaTemplate<String, String> confluentKafkaTemplate() {
            return new KafkaTemplate<>(confluentProducerFactory());
        }

        @Bean(name = "cloudera")
        public KafkaTemplate<String, String> clouderaKafkaTemplate() {
            return new KafkaTemplate<>(cloudraProducerFactory());
        }

    }




public class ProducerExample {

    @Autowired
    @Qualifier("cloudera")
    private KafkaTemplate clouderaKafkaTemplate;


    @Autowired
    @Qualifier("confluent")
    private KafkaTemplate confluentKafkaTemplate;

    public void send() {
        confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
        clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
    }

}

Upvotes: 25

Artem Bilan
Artem Bilan

Reputation: 121177

You can create several Producer instances (KafkaTemplate) via the same ProducerFactory.

If you need different Kafka configurations, you’ll need different ProducerFactory instances.

Upvotes: 12

Related Questions