Justin
Justin

Reputation: 1239

Spring Kafka SSL setup in Spring boot application.yml

I am trying to setup a Spring Boot Application with a Kafka Client to use SSL. I have my keystore.jks and truststore.jks stored on a filesystem(on a docker container) because of this: https://github.com/spring-projects/spring-kafka/issues/710

Here is my application.yml:

spring:
  kafka:
      ssl:
        key-password: pass
        keystore-location: /tmp/kafka.client.keystore.jks
        keystore-password: pass
        truststore-location: /tmp/kafka.client.truststore.jks
        truststore-password: pass

But when I start the application ( in a docker container) it says:

Caused by: java.lang.IllegalStateException: Resource 'class path resource [tmp/kafka.client.keystore.jks]' must be on a file system
[..]
Caused by: java.io.FileNotFoundException: class path resource [tmp/kafka.client.keystore.jks] cannot be resolved to URL because it does not exist

I checked on the container and the .jks are there in /tmp .

I cannot understand how to pass .jks to spring boot.

UPDATE 06/07/2018

This is my dockerfile

FROM openjdk:8-jdk-alpine
VOLUME /tmp
COPY ssl/kafka.client.keystore.jks /tmp
COPY ssl/kafka.client.truststore.jks /tmp
ARG JAR_FILE
ADD ${JAR_FILE} app.jar
ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]

Upvotes: 13

Views: 71824

Answers (3)

Raphael Amoedo
Raphael Amoedo

Reputation: 4455

I was looking for loading keystore/truststore through classpath and here is one of the first links I got. I was able to find a solution and since I was using Spring Boot and Spring Kafka - configuration only with properties - I was looking for a solution like this, so this answer might help other people as well. So, there's a BeanPostProcessor class that you can implement. It provides two methods:

postProcessAfterInitialization and postProcessBeforeInitialization

Factory hook that allows for custom modification of new bean instances — for example, checking for marker interfaces or wrapping beans with proxies. Typically, post-processors that populate beans via marker interfaces or the like will implement postProcessBeforeInitialization(java.lang.Object, java.lang.String), while post-processors that wrap beans with proxies will normally implement postProcessAfterInitialization(java.lang.Object, java.lang.String).

I was using Spring Boot with Spring Kafka and I only wanted a change for local profile.

In my code example, I was using it to override Kafka Location properties, because for SSL it doesn't read from classpath.

So this was the code:

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;

@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {

  @Value("${spring.kafka.ssl.key-store-location:}")
  private Resource keyStoreResource;
  @Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
  private Resource trustStoreResource;
  private final Environment environment;

  @SneakyThrows
  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    if (bean instanceof KafkaProperties) {
      KafkaProperties kafkaProperties = (KafkaProperties) bean;
      if(isLocalProfileActive()) {
        configureStoreLocation(kafkaProperties);
      }
    }
    return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
  }

  private boolean isLocalProfileActive() {
    return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
  }

  private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
    kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
    kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
    kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
  }

}

This way I could have on my properties file:

spring.kafka.ssl.key-store-location=classpath:mykeystore.jks

And the code would get the absolute path from that and set it. It makes also possible to filter based on profiles.

It's important to mention that BeanPostProcessor runs for EVERY bean, so make sure you filter what you want.

Upvotes: 3

Cameron Sours
Cameron Sours

Reputation: 380

If anyone is still looking at this, try prepending file:// to the file path:

truststorelocation: "file:///tmp/kafka.client.keystore.jks"

The error is complaining about the lack of a URL - adding a protocol (file://) makes the path a URL (speaking very basically)

Upvotes: 29

Ryuzaki L
Ryuzaki L

Reputation: 39978

According to discussion and to enable kafka ssl configuration, first need to enable and set ssl properties in consumerFactory

@Bean
public ConsumerFactory<String, ReportingTask> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializable.class);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecords);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSet);
    if (sslEnabled) {
        props.put("security.protocol", "SSL");
        props.put("ssl.truststore.location", trustStoreLocation);
        props.put("ssl.truststore.password", trustStorePassword);

        props.put("ssl.key.password", keyStorePassword);
        props.put("ssl.keystore.password", keyStorePassword);
        props.put("ssl.keystore.location", keyStoreLocation);
    }
    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Task.class));
}

And copy the certificates into docker container

COPY ssl/stage/* /var/lib/kafka/stage/

Upvotes: 20

Related Questions