Reputation: 1239
I am trying to setup a Spring Boot Application with a Kafka Client to use SSL. I have my keystore.jks and truststore.jks stored on a filesystem(on a docker container) because of this: https://github.com/spring-projects/spring-kafka/issues/710
Here is my application.yml:
spring:
kafka:
ssl:
key-password: pass
keystore-location: /tmp/kafka.client.keystore.jks
keystore-password: pass
truststore-location: /tmp/kafka.client.truststore.jks
truststore-password: pass
But when I start the application ( in a docker container) it says:
Caused by: java.lang.IllegalStateException: Resource 'class path resource [tmp/kafka.client.keystore.jks]' must be on a file system
[..]
Caused by: java.io.FileNotFoundException: class path resource [tmp/kafka.client.keystore.jks] cannot be resolved to URL because it does not exist
I checked on the container and the .jks are there in /tmp .
I cannot understand how to pass .jks to spring boot.
UPDATE 06/07/2018
This is my dockerfile
FROM openjdk:8-jdk-alpine
VOLUME /tmp
COPY ssl/kafka.client.keystore.jks /tmp
COPY ssl/kafka.client.truststore.jks /tmp
ARG JAR_FILE
ADD ${JAR_FILE} app.jar
ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]
Upvotes: 13
Views: 71824
Reputation: 4455
I was looking for loading keystore/truststore through classpath and here is one of the first links I got. I was able to find a solution and since I was using Spring Boot and Spring Kafka - configuration only with properties - I was looking for a solution like this, so this answer might help other people as well. So, there's a BeanPostProcessor class that you can implement. It provides two methods:
postProcessAfterInitialization
and postProcessBeforeInitialization
Factory hook that allows for custom modification of new bean instances — for example, checking for marker interfaces or wrapping beans with proxies. Typically, post-processors that populate beans via marker interfaces or the like will implement postProcessBeforeInitialization(java.lang.Object, java.lang.String), while post-processors that wrap beans with proxies will normally implement postProcessAfterInitialization(java.lang.Object, java.lang.String).
I was using Spring Boot with Spring Kafka and I only wanted a change for local profile.
In my code example, I was using it to override Kafka Location properties, because for SSL it doesn't read from classpath.
So this was the code:
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
import java.io.IOException;
import java.util.Arrays;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration implements BeanPostProcessor {
@Value("${spring.kafka.ssl.key-store-location:}")
private Resource keyStoreResource;
@Value("${spring.kafka.properties.schema.registry.ssl.truststore.location:}")
private Resource trustStoreResource;
private final Environment environment;
@SneakyThrows
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof KafkaProperties) {
KafkaProperties kafkaProperties = (KafkaProperties) bean;
if(isLocalProfileActive()) {
configureStoreLocation(kafkaProperties);
}
}
return BeanPostProcessor.super.postProcessAfterInitialization(bean, beanName);
}
private boolean isLocalProfileActive() {
return Arrays.stream(environment.getActiveProfiles()).anyMatch(profile -> "local".equals(profile));
}
private void configureStoreLocation(KafkaProperties kafkaProperties) throws IOException {
kafkaProperties.getSsl().setKeyStoreLocation(new FileSystemResource(keyStoreResource.getFile().getAbsolutePath()));
kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, keyStoreResource.getFile().getAbsolutePath());
kafkaProperties.getSsl().setTrustStoreLocation(new FileSystemResource(trustStoreResource.getFile().getAbsolutePath()));
kafkaProperties.getProperties().put(SchemaRegistryClientConfig.CLIENT_NAMESPACE + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreResource.getFile().getAbsolutePath());
}
}
This way I could have on my properties file:
spring.kafka.ssl.key-store-location=classpath:mykeystore.jks
And the code would get the absolute path from that and set it. It makes also possible to filter based on profiles.
It's important to mention that BeanPostProcessor runs for EVERY bean, so make sure you filter what you want.
Upvotes: 3
Reputation: 380
If anyone is still looking at this, try prepending file:// to the file path:
truststorelocation: "file:///tmp/kafka.client.keystore.jks"
The error is complaining about the lack of a URL - adding a protocol (file://) makes the path a URL (speaking very basically)
Upvotes: 29
Reputation: 39978
According to discussion and to enable kafka ssl configuration, first need to enable and set ssl properties in consumerFactory
@Bean
public ConsumerFactory<String, ReportingTask> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializable.class);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offSet);
if (sslEnabled) {
props.put("security.protocol", "SSL");
props.put("ssl.truststore.location", trustStoreLocation);
props.put("ssl.truststore.password", trustStorePassword);
props.put("ssl.key.password", keyStorePassword);
props.put("ssl.keystore.password", keyStorePassword);
props.put("ssl.keystore.location", keyStoreLocation);
}
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Task.class));
}
And copy the certificates into docker container
COPY ssl/stage/* /var/lib/kafka/stage/
Upvotes: 20