Gautam Baindur
Gautam Baindur

Reputation: 31

Spring Kafka-ConfigException: Invalid value TopicNameStrategy for configuration key.subject.name.strategy: Class TopicNameStrategy could not be found

we just deployed a kafka producer to prod and facing a weird issue that didn't popup in non-prod. The service is a spring boot microservice that receives a REST HTTP request and uses spring kafka to publish an event onto a topic. The Microservice is hosted on AWS ECS. The java version for this API is set to java 11. Below is the error:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:441)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createRawProducer(DefaultKafkaProducerFactory.java:743)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:584)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.doCreateProducer(DefaultKafkaProducerFactory.java:544)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:519)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:513)
    at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:683)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:569)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:386)
    at com.nab.ms.hs.lodgement.producer.HardshipCaseSubmitEventProducer.publishHardshipCaseSubmitEvent(HardshipCaseSubmitEventProducer.java:47)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.lambda$publishHardshipCaseSubmitEvent$0(CreateHardshipRequestService.java:108)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processAccountRequest(CreateHardshipRequestService.java:103)
    at com.nab.ms.hs.lodgement.application.CreateHardshipRequestService.processNewHardshipRequest(CreateHardshipRequestService.java:75)
    at com.nab.ms.hs.lodgement.application.HardshipNewRequestService.lambda$processNewHardshipRequest$0(HardshipNewRequestService.java:46)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1728)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.ExceptionInInitializerError: null
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:50)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:368)
    ... 22 common frames omitted
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.kafka.serializers.subject.TopicNameStrategy for configuration key.subject.name.strategy: Class io.confluent.kafka.serializers.subject.TopicNameStrategy could not be found.
    at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:729)

below are the dependencies when i run the ./gradlew dependencies --configuration=runtimeClasspath command

+--- org.springframework.kafka:spring-kafka:2.7.1
|    |    +--- org.springframework:spring-context:5.3.7 (*)
|    |    +--- org.springframework:spring-messaging:5.3.7
|    |    |    +--- org.springframework:spring-beans:5.3.7 (*)
|    |    |    \--- org.springframework:spring-core:5.3.7 (*)
|    |    +--- org.springframework:spring-tx:5.3.7 (*)
|    |    +--- org.springframework.retry:spring-retry:1.3.1
|    |    +--- org.apache.kafka:kafka-clients:2.7.1
|    |    |    +--- com.github.luben:zstd-jni:1.4.5-6
|    |    |    +--- org.lz4:lz4-java:1.7.1
|    |    |    +--- org.xerial.snappy:snappy-java:1.1.7.7
|    |    |    \--- org.slf4j:slf4j-api:1.7.30
|    |    +--- org.jetbrains.kotlin:kotlin-stdlib:1.4.32
|    |    |    +--- org.jetbrains.kotlin:kotlin-stdlib-common:1.4.32
|    |    |    \--- org.jetbrains:annotations:13.0
|    |    \--- com.google.code.findbugs:jsr305:3.0.2
|    +--- org.apache.avro:avro:1.10.2
|    |    +--- com.fasterxml.jackson.core:jackson-core:2.12.2 -> 2.11.4
|    |    +--- com.fasterxml.jackson.core:jackson-databind:2.12.2 -> 2.11.4 (*)
|    |    +--- org.apache.commons:commons-compress:1.20
|    |    \--- org.slf4j:slf4j-api:1.7.30
|    +--- org.apache.kafka:kafka-clients:2.7.1 (*)
|    +--- org.apache.kafka:kafka-streams:2.7.1 -> 2.6.2
|    |    +--- org.apache.kafka:kafka-clients:2.6.2 -> 2.7.1 (*)
|    |    +--- org.apache.kafka:connect-json:2.6.2
|    |    |    +--- org.apache.kafka:connect-api:2.6.2
|    |    |    |    +--- org.apache.kafka:kafka-clients:2.6.2 -> 2.7.1 (*)
|    |    |    |    \--- org.slf4j:slf4j-api:1.7.30
|    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.10.5.1 -> 2.11.4 (*)
|    |    |    +--- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.10.5 -> 2.11.4 (*)
|    |    |    \--- org.slf4j:slf4j-api:1.7.30
|    |    +--- org.slf4j:slf4j-api:1.7.30
|    |    \--- org.rocksdb:rocksdbjni:5.18.4
|    +--- io.confluent:common-config:6.1.1
|    |    +--- io.confluent:common-utils:6.1.1
|    |    |    \--- org.slf4j:slf4j-api:1.7.30
|    |    \--- org.slf4j:slf4j-api:1.7.30
|    +--- io.confluent:common-utils:6.1.1 (*)
|    +--- io.confluent:kafka-avro-serializer:6.1.1
|    |    +--- org.apache.avro:avro:1.9.2 -> 1.10.2 (*)
|    |    +--- io.confluent:kafka-schema-serializer:6.1.1
|    |    |    +--- io.confluent:kafka-schema-registry-client:6.1.1
|    |    |    |    +--- org.apache.kafka:kafka-clients:6.1.1-ccs -> 2.7.1 (*)
|    |    |    |    +--- org.apache.avro:avro:1.9.2 -> 1.10.2 (*)
|    |    |    |    +--- com.fasterxml.jackson.core:jackson-databind:2.10.5.1 -> 2.11.4 (*)
|    |    |    |    +--- jakarta.ws.rs:jakarta.ws.rs-api:2.1.6
|    |    |    |    +--- org.glassfish.jersey.core:jersey-common:2.31 -> 2.32
|    |    |    |    |    +--- jakarta.ws.rs:jakarta.ws.rs-api:2.1.6
|    |    |    |    |    +--- jakarta.annotation:jakarta.annotation-api:1.3.5
|    |    |    |    |    +--- org.glassfish.hk2.external:jakarta.inject:2.6.1
|    |    |    |    |    \--- org.glassfish.hk2:osgi-resource-locator:1.0.3
|    |    |    |    +--- io.swagger:swagger-annotations:1.6.2
|    |    |    |    \--- io.confluent:common-utils:6.1.1 (*)
|    |    |    \--- io.confluent:common-utils:6.1.1 (*)
|    |    +--- io.confluent:kafka-schema-registry-client:6.1.1 (*)
|    |    \--- io.confluent:common-utils:6.1.1 (*)
|    +--- io.confluent:kafka-schema-registry-client:6.1.1 (*)
|    +--- io.confluent:kafka-streams-avro-serde:6.1.1
|    |    +--- io.confluent:kafka-avro-serializer:6.1.1 (*)
|    |    +--- io.confluent:kafka-schema-registry-client:6.1.1 (*)
|    |    +--- org.apache.avro:avro:1.9.2 -> 1.10.2 (*)
|    |    \--- io.confluent:common-utils:6.1.1 (*)
|    +--- org.springframework.boot:spring-boot-starter:2.4.5 -> 2.4.6 (*)

Below is the producer code:

@Service
@Slf4j
public class EventProducer {

  private final KafkaTemplate<EventKey, SubmitEvent> hardshipProducer;
  
  @Value("${app.kafka.topic.hardship.case.submit.event.name}")
  private String topicName;

  public EventProducer(KafkaTemplate<EventKey, SubmitEvent> hardshipProducer) {
    this.hardshipProducer = hardshipProducer;
  }

  @SneakyThrows
  public void publishHardshipCaseSubmitEvent(HardshipCaseSubmitEvent hardshipCaseSubmitEvent, HardshipData hardshipData) {
    
    ListenableFuture<SendResult<EventKey, SubmitEvent>> future = hardshipProducer.send(topicName,
        EventKey.newBuilder().setCaseId(hardshipData.getHsCaseId()).build(),
        hardshipCaseSubmitEvent);
    future.addCallback(new ListenableFutureCallback<>() {
      @SneakyThrows
      @Override
      public void onFailure(@NonNull Throwable ex) {
        log.error("Exception = " + ex.getMessage() + " publishing hardshipCaseSubmitEvent for meId = " + hardshipCaseSubmitEvent.getData().getMeId() + ", correlation id=" + correlationId + ", caseId=" + hardshipData.getHsCaseId(), ex);
      }

      @Override
      public void onSuccess(SendResult<HardshipEventKey, HardshipCaseSubmitEvent> result) {
        
          log.info("hardshipCaseSubmitEvent event status = success, partition= {}, offset= {}, meId={}, correlation id={}, caseId={}",
              result.getRecordMetadata().partition(),
              result.getRecordMetadata().offset(),
              hardshipCaseSubmitEvent.getData().getMeId().toString(),
              correlationId,
              hardshipData.getHsCaseId());
        }
      
    });
    hardshipProducer.flush();
  }
}

Also to note that, the event is produced successfully sometimes and fails sometimes with the above error. I have logged the event body to compare and find no difference whatsoever. I have checked the war files present in the container instances and found that all jar files dependencies are there as expected. The Topic subjects have been setup with TopicNameStrategy and the same is provided in the yml config as well. Please let me know if anybody has any idea

EDIT: add the configs here

nabkafka:
  kafka:
    allow.auto.create.topics: false
    schema-registry:
      cache-size: 2048
      auto.register.schemas: false
      key-subject-name-strategy: Topic
      value-subject-name-strategy: Topic
      subject-name-strategy: Topic
      ssl:
        protocol: SSL
        key-store-location: file:${infrastructure.services.ssl.keyStorePath}
        key-store-password: ${infrastructure.services.ssl.keyStorePassword}
        key-password: ${infrastructure.services.ssl.keyStorePassword}
        trust-store-location: file:${infrastructure.services.ssl.trustStorePath}
        trust-store-password: ${infrastructure.services.ssl.trustStorePassword}
        trust-store-type: JKS
        key-store-type: JKS
    producer:
      acks: all
      key-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      properties:
        auto.register.schemas: false
    ssl:
      key-store-location: file:${infrastructure.services.ssl.keyStorePath}
      key-store-password: ${infrastructure.services.ssl.keyStorePassword}
      key-password: ${infrastructure.services.ssl.keyStorePassword}
      trust-store-location: file:${infrastructure.services.ssl.trustStorePath}
      trust-store-password: ${infrastructure.services.ssl.trustStorePassword}

Please note that we use a wrapper over spring kafka which works perfectly fine in the organization and even in our non-prod env.

Upvotes: 3

Views: 3090

Answers (1)

Sebastian
Sebastian

Reputation: 61

I got the same issue. As @Vichukano mentioned, it's a classloader issue related to the executor where the task run (in this case, ForkJoinPool). How do you define the DefaultKafkaProducerFactory you are using? Starting in spring-kafka v2.8.0, they added a flag to configure the serializer when creating a new producer (same feature for consumer factory). You should configure (if you need it) your serializer beforehand and set this flag to false, e.g.:

  @Bean
  ProducerFactory producerFactory(SchemaRegistryClient registryClient, KafkaProperties kafkaProperties) {
    var serializer = new KafkaAvroSerializer(registryClient);
    var configs = kafkaProperties.buildProducerProperties();

    serializer.configure(configs, false);

    return new DefaultKafkaProducerFactory(configs, new StringSerializer(), serializer, false);
  }

Upvotes: 3

Related Questions