Reputation: 95
I am creating a Kafka-based Flink streaming application, and am trying to create an associated KafkaSource
connector in order to read Kafka data.
For example:
final KafkaSource<String> source = KafkaSource.<String>builder()
// standard source builder setters
// ...
.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "truststore.jks")
.build();
The truststore.jks
file is created locally on the job manager node before the application is executed, and I've verified that it exists and is correctly populated. My problem is that, in a distributed Flink application, this truststore.jks
does not automatically also exist on the task worker nodes, so the above code results in a FileNotFoundException
when executed.
What I've tried:
env.registerCacheFile
and getRuntimeContext().getDistributedCache().getFile()
in order to distribute the file to all nodes, but since the graph is being built and the application is not yet running, the RuntimeContext is not available at this stage.KafkaSource
hook to do this, and haven't found any such functionality in the docs.What is the best way to make this file available to task worker nodes during the source initialization?
I have read similar questions posted here before:
RuntimeContext
at this point in the application.KafkaSource
in the docs.Upvotes: 3
Views: 1362
Reputation: 95
Update:
I was able to work around this issue by instead using the ssl.truststore.certificates
configuration field. This allows me to supply a base64-encoded representation of the underlying truststore.jks
certificate instead of a local file path.
[I also had to update my kafka-clients
dependency to 2.7.x
+ as this configuration is not available in older versions of the library]
Upvotes: 1