user1393650
user1393650

Reputation: 129

org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to ObjectNode

When I used the following code:

    KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
            .setProperties(kafkaProps)
            .setProperty("ssl.truststore.type",trustStoreType)
            .setProperty("ssl.truststore.password",trustStorePassword)
            .setProperty("ssl.truststore.location",trustStoreLocation)
            .setProperty("security.protocol",securityProtocol)
            .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
            .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
            .setGroupId(groupId)
            .setTopics(kafkaInputTopic)
            .setDeserializer(new JSONKeyValueDeserializationSchema(false))
            .setStartingOffsets
             (OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .build();

I get the following error during build incompatible types: org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema cannot be converted to org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema<org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode>

                .setDeserializer(new JSONKeyValueDeserializationSchema(false))

Someone a clue as to what is wrong?

Upvotes: 2

Views: 796

Answers (2)

user22555227
user22555227

Reputation: 1

I am passing truststore location as path and failing with below error and i noticed you are passing it as a variable, can you provide some insights on this?

error: Caused by: org.apache.kafka.common.KafkaException: Failed to load SSL keystore truststore.jks of type JKS

Upvotes: 0

user1393650
user1393650

Reputation: 129

The solution is:

    KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
            .setProperties(kafkaProps)
            .setProperty("ssl.truststore.type",trustStoreType)
            .setProperty("ssl.truststore.password",trustStorePassword)
            .setProperty("ssl.truststore.location",trustStoreLocation)
            .setProperty("security.protocol",securityProtocol)
            .setProperty("partition.discovery.interval.ms", partitionDiscoveryIntervalMs)
            .setProperty("commit.offsets.on.checkpoint", commitOffsetsOnCheckpoint)
            .setGroupId(groupId)
            .setTopics(kafkaInputTopic)
            .setDeserializer(KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(fetchMetadata)))
            .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
            .build();

Upvotes: 2

Related Questions