Tobias Hermann
Tobias Hermann

Reputation: 10936

Kafka Connect BigQuery Sink Connector requests incorrect subject names from the Schema Registry

While trying to use confluentinc/kafka-connect-bigquery on our Kafka (Avro) events, I run into the following error:

org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic domain.user to Avro: 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:125)
[...]
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 619
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'domain.user-key' not found.; error code: 40401
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:440)
[...]

And yes, there is no subject domain.user-key in the schema registry:

curl --silent -X GET http://avro-schema-registry.core-kafka.svc.cluster.local:8081/subjects | jq . | grep "domain\.user"
[...]
  "domain.user-com.acme.message_schema.type.domain.key.DefaultKey",
  "domain.user-com.acme.message_schema.domain.user.Key",
[...]

How can I make the connector use the right subject names?

My properties/connector.properties (I'm using the quickstart folder.) looks as follows:

[...]
topics=domain.user
sanitizeTopics=true
autoUpdateSchemas=true
autoCreateTables=true
allowNewBigQueryFields=true
[...]

In the end, I want to use topics.regex=domain.* instead of topics=domain.user to capture all our domain-event topics, but with this I get the same type of error (just for a different subject).

Upvotes: 1

Views: 341

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

You need to set key.converter.key.subject.name.strategy & value.converter.value.subject.name.strategy to be io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

Upvotes: 1

Related Questions