Skywarp
Skywarp

Reputation: 1059

Kafka Configuration class in Spring Boot not finding keystore or truststore

I'm setting up a Kafka consumer configuration and the configuration cannot find the keystore or truststore on the classpath:

@EnableKafka
@Configuration
public class KafkaConfig {

    @Value("${kafka.ssl.keystore}")
    private String keyStorePath;
    @Value("${kafka.ssl.truststore}")
    private String trustStorePath;

    @Bean
    public ConsumerFactory<String, String> getConsumerFactory() {

        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"my-bootstrap.mydomain.com:443");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "client1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "500");
        properties.put("session.timeout.ms", "30000");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
        properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStorePath);
        properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password");
        properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStorePath);
        properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
        properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password");

        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory
                = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(getConsumerFactory());
        return factory;
    }
}

The keystore and truststore are both located in the directory src/main/resources/ssl in the same maven module as the configuration class.

I set up the placeholders in the application.yml as follows:

kafka:
  ssl:
    keystore: classpath:ssl/kafka-keystore.jks
    truststore: classpath:ssl/kafka-truststore.jks

However, the application fails to start with the following exception:

"org.apache.kafka.common.KafkaException: java.io.FileNotFoundException: classpath:ssl/kafka-keystore.jks (No such file or directory)"

My understanding is that using @Value enables the use of the classpath: prefix to resolve the classpath (see this link) https://www.baeldung.com/spring-classpath-file-access

Moreover, the @Value technique works just fine to resolve the keystores and truststores for the reactive WebClient configuration in the same application.

What do I need to do to resolve the classpath for the Kafka configuration? Am I missing something here?

Upvotes: 6

Views: 12483

Answers (2)

Raphael Amoedo
Raphael Amoedo

Reputation: 4455

For those like me that were using Spring Boot and Spring Kafka and don't override a DefaultKafkaConsumerFactory - only use properties for configuration -, there's a BeanPostProcessor class that you can implement. It provides two methods:

postProcessAfterInitialization and postProcessBeforeInitialization

Factory hook that allows for custom modification of new bean instances — for example, checking for marker interfaces or wrapping beans with proxies. Typically, post-processors that populate beans via marker interfaces or the like will implement postProcessBeforeInitialization(java.lang.Object, java.lang.String), while post-processors that wrap beans with proxies will normally implement postProcessAfterInitialization(java.lang.Object, java.lang.String).

I was using Spring Boot with Spring Kafka and I only wanted a change for local profile.

In my code example, I was using it to override Kafka Location properties, because for SSL it doesn't read from classpath.

So this was the code:

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {

  @Value("${spring.kafka.ssl.key-store-location:}")
  private Resource keyStoreResource;
  @Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
  private Resource trustStoreResource;
  private final Environment environment;

  @SneakyThrows
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean instanceof KafkaProperties) {
      KafkaProperties kafkaProperties = (KafkaProperties) bean;
      if(isLocalProfileActive()) {
        configureStoreLocation(kafkaProperties);
      }
    }
    return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
  }

  private boolean isLocalProfileActive() {
    return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
  }

  private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
    kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
    kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
  }

}

This way I could have on my properties file:

spring.kafka.ssl.key-store-location=classpath:mykeystore.jks

And the code would get the absolute path from that and set it. It makes also possible to filter based on profiles.

It's important to mention that BeanPostProcessor runs for EVERY bean, so make sure you filter what you want.

Upvotes: 2

camtastic
camtastic

Reputation: 988

Your injecting into a String which is going to keep the "classpath:" within the String value and provide it as a property to DefaultKafkaConsumerFactory, try injecting into a spring Resource like:

import org.springframework.core.io.Resource;

@Value("classpath:path/to/file/in/classpath")
Resource resourceFile;

Then you can access the file and you could get the absolute path like:

resourceFile.getFile().getAbsolutePath()

The idea being you could provide the absolute path to DefaultKafkaConsumerFactory

But you could also try removing the "classpath:" and inject as String like your current code which might work depending on how DefaultKafkaConsumerFactory treats that property. But I can't see why absolute path above wouldn't work.

Upvotes: 10

Related Questions