Mohamed Ayman
Mohamed Ayman

Reputation: 1

Kafka JDBC Sink connector Plain Json with schema registry

I have Kafka cluster developed on strimzi, i want to make sink connector to replicate the data in topics to oracle database, the data in the topics which produced from the producer is plain json as below:

{"ID":"348010961","TRANSACTION_REQUEST_TYPE_ID":"111"}.

knowing that I don't have control on the producer.

I know that JDBC Sink connector streams to a relational database, and relational databases have schemas so JDBC Sink connector therefore requires a schema to be present for the data and will not work with this plain Json.

so, I configure schema registry image and i managed to connect it with the brokers and register a schema.

but the problem is that i don't know what schema exactly to register and what converter to use Avro or JsonSchema.

here is my connector configuration:

spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  config:
    value.converter.schema.registry.url: 'http://schema-registry-test:8081'
    value.converter: io.confluent.connect.json.JsonSchemaConverter
    key.converter: io.confluent.connect.json.JsonSchemaConverter
    topics: test1
    value.converter.schema.registry.version: 1
    value.converter.schema.registry.id: 1
    value.converter.schemas.enable: true
    key.converter.schema.registry.subject: my-schema-value
    connection.password: 'xxxx'
    pk.fields: ID
    key.converter.schema.registry.url: 'http://schema-registry-test:8081'
    pk.mode: record_value
    tasksMax: 1
    insert.mode: insert
    connection.user: xxx
    auto.create: true
    value.converter.schema.registry.subject: my-schema-value
    connection.url: 'xxxx'

I found 2 issues in the logs. the first issue in connector logs as shown below:

yourorg.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic test1: at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1 at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:236) at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:313) at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193) at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127) ... 17 more text Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

but whatever I register a schema or not i got the same error so i am not sure that the connector linked to the registry and validate the schema as i got the below INFO in schema registry logs:

[2023-07-17 17:41:17,794] INFO [Consumer clientId=KafkaStore-reader-_schemas, groupId=schema-registry-schema-registry-8081] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient) [2023-07-17 17:41:18,516] INFO [Schema registry clientId=sr-1, groupId=schema-registry] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient) [2023-07-17 17:41:40,576] INFO [Producer clientId=producer-1] Node -1 disconnected. (org.apache.kafka.clients.NetworkClient)

Dynamic member with unknown member id joins group schema-registry in Empty state. Created a new member id sr-1-8343ba38-46a2-4b36-81ec-ddb874b30ae5 and request the member to rejoin with this id. (kafka.coordinator.group.GroupCoordinator) [data-plane-kafka-request-handler-3] 2023-07-17 17:32:18,443 INFO [GroupCoordinator 0]: Preparing to rebalance group schema-registry in state PreparingRebalance with old generation 27 (__consumer_offsets-29) (reason: Adding new member sr-1-8343ba38-46a2-4b36-81ec-ddb874b30ae5 with group instance id None; client reason: rebalance failed due to MemberIdRequiredException

so my questions are:

1- what schema structure should i register to the schema registry to be valid with this plain json.

2- should i use avro converter or jsonschema converter.

3- is the problem with the registered schema or thier is another issue that affects the connector to access the schema registry and validate the schema.

Thanks in advance.

Upvotes: 0

Views: 670

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191671

what converter to use Avro or JsonSchema

Both (also Protobuf) will work since each have a schema.

But, if you can't modify the producer, you cannot use either.

If you could modify the producer (or consume and parse/convert the plain json to a new topic) to use any of the above formats, a schema would automatically be registered to the registry

Upvotes: 0

Related Questions