Reputation: 61
I am using a jdbc source connector with query mode, and it seems that without a specified table name, the schemas registered in the schema-registry for the record key and record value have empty schema names and are being assigned the default name "ConnectDefault" as defined in Confluent's AvroData class https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroData.java
When running a Kafka Streams application using generated avro sources and SpecificAvroSerde, I am getting the error:
Exception in thread "streams-app-6e39ebfd-db14-49bc-834f-afaf108a6d25-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to deserialize value for record. topic=topic-name, partition=0, offset=0
at org.apache.kafka.streams.processor.internals.SourceNodeRecordDeserializer.deserialize(SourceNodeRecordDeserializer.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:474)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:642)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:519)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 2
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class io.confluent.connect.avro.ConnectDefault specified in writer's schema whilst finding reader's schema for a SpecificRecord.
I have tried to POST a new version of both the key and value schemas from the topic with a table name as the schema name and DELETE the original versions that had the \"name\":\"ConnectDefault\",\"namespace\":\"io.confluent.connect.avro\"
properties with no luck. Am I missing a class named ConnectDefault or can I specify somewhere in the source connector a schema name without the namespace?
My Kafka Streams configuration:
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
My Kafka Connect configuration:
name=source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:oracle:thin:
mode=incrementing
incrementing.column.name=id
query=QUERY
topic.prefix=topic-name
transforms=InsertKey, ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=id
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schemas.enable=true
value.converter.schema.registry.url=http://localhost:8081
Upvotes: 3
Views: 5968
Reputation: 1
By default kafka will search for key schema in ConnectDefault.
Just create the key schema class with name as "ConnectDefault" and keep that class inside package "io.confluent.connect.avro".
Upvotes: -1
Reputation: 61
The problem was that the schema name defaults to null with the jdbc source connector in query mode. https://github.com/confluentinc/kafka-connect-jdbc/issues/90
Looks like this can be solved by adding a schema name with SMT (Single Message Transforms) in the source connector with the SetSchemaMetadata transformation. https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect
transforms=setValueSchema
transforms.setValueSchema.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
transforms.setValueSchema.schema.name=io.confluent.connect.avro.ConnectDefault
Upvotes: 2