Reputation: 129
When I used the following code:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
.setStartingOffsets
(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
I get the following error during build
incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>
.setDeserializer(new JSONKeyValueDeserializationSchema(false))
Someone a clue as to what is wrong?
Upvotes: 2
Views: 796
Reputation: 1
I am passing truststore location as path and failing with below error and i noticed you are passing it as a variable, can you provide some insights on this?
error: Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore truststore.jks of type JKS
Upvotes: 0
Reputation: 129
The solution is:
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.build();
Upvotes: 2