Reputation: 1041
It's driving me crazy as I'm trying to sink a kafka topic into a Postgres table. Here's my setup and I'm not sure what I'm doing wrong.
This is a typical message from the Kafka topic
{
"flightId": "5cbc7ad25732ab0004c51c45",
"recordedAt": "2022-03-26T18:17:11.356Z",
"device": "iOS",
"platform": "A5",
"vehicleId": "621c12a9b12161009865bc5d"
}
Below is my docker-compose.yaml
file
version: '3.7'
services:
connector:
image: custom-connector:latest
environment:
CONNECT_BOOTSTRAP_SERVERS: ${CONNECT_BOOTSTRAP_SERVERS}
CONNECT_GROUP_ID: "kafka-connect-group-id"
CONNECT_CONFIG_STORAGE_TOPIC: "kafka-connect-config"
CONNECT_OFFSET_STORAGE_TOPIC: "kafka-connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "kafka-connect-status"
CONNECT_REST_ADVERTISED_HOST_NAME: ${CONNECT_REST_ADVERTISED_HOST_NAME}
CONNECT_SECURITY_PROTOCOL: ${CONNECT_SECURITY_PROTOCOL}
CONNECT_SASL_MECHANISM: ${CONNECT_SASL_MECHANISM}
CONNECT_REST_PORT: 8083
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
CONNECT_CONSUMER_SECURITY_PROTOCOL: ${CONNECT_SECURITY_PROTOCOL}
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
CONNECT_PRODUCER_SECURITY_PROTOCOL: ${CONNECT_SECURITY_PROTOCOL}
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/u01/connectors
CONNECT_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
CONNECT_CONSUMER_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
CONNECT_PRODUCER_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
CONNECT_VALUE_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_IGNORE: 'true'
ports:
- "8083:8083"
schema-registry:
image: "confluentinc/cp-schema-registry:5.2.1"
ports:
- '8081:8081'
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: SASL_SSL://${CONNECT_BOOTSTRAP_SERVERS}
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SASL_SSL
SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG: ${JAAS_CONFIG}
SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM: PLAIN
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
My connector's config file when sending a PUT request to Kafka-connect.
{
"name": "test-postgres-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://******:5432/db",
"connection.user": "******",
"connection.password": "******",
"topics": "test-topic",
"table.name.format": "kafka_sink_test",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.ignore": "true",
"name": "test-postgres-sink-connector"
},
"tasks": [
{
"connector": "test-postgres-sink-connector",
"task": 0
}
],
"type": "sink"
}
From the logs, kafka-connect is complaining:
ERROR WorkerSinkTask{id=test-postgres-sink-connector-0} Error converting message value in topic 'test-topic' partition 2 at offset 0 and timestamp 1647927842369: Converting byte[] to Kafka Connect data failed due to serialization error of topic test-topic: (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic test-topic:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:177)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:235)
at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
... 18 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:112)
... 21 more
[2022-03-26 18:11:31,779] ERROR WorkerSinkTask{id=test-postgres-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic test-topic:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:119)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:516)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:177)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:235)
at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:165)
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:108)
... 18 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:250)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:112)
... 21 more
[2022-03-26 18:11:31,780] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
[2022-03-26 18:11:31,781] INFO [Consumer clientId=connector-consumer-test-postgres-sink-connector-0, groupId=test-postgres-sink-connector] Revoke previously assigned partitions test-topic-0, test-topic-1, test-topic-2, test-topic-3, test-topic-4, test-topic-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-03-26 18:11:31,781] INFO [Consumer clientId=test-postgres-sink-connector-0, groupId=test-postgres-sink-connector] Member test-postgres-sink-connector-0-89225797-cac6-41f5-9373-bbd16bc8a1b6 sending LeaveGroup request to coordinator b2-pkc-2396y.us-east-1.aws.confluent.cloud:9092 (id: 2147483645 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-03-26 18:11:31,783] INFO [Consumer clientId=test-postgres-sink-connector-0, groupId=test-postgres-sink-connector] Resetting generation due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-03-26 18:11:31,783] INFO [Consumer clientId=connector-test-postgres-sink-connector-0, groupId=connect-test-postgres-sink-connector] Request joining group due to: consumer pro-actively leaving the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2022-03-26 18:11:32,284] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
[2022-03-26 18:11:32,285] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
[2022-03-26 18:11:32,286] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
[2022-03-26 18:11:32,316] INFO App info kafka.consumer for connector-test-postgres-sink-connector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
Upvotes: 2
Views: 2667
Reputation: 191671
This is a typical message from the Kafka topic
Your data has no schema, so you cannot use JsonSchemaConverter
. Plus, the JDBC Sink requires a schema. JDBC Sink Deep Dive
Since it is has no schema, and specifically didn't use the JSONSchema serializer with the Confluent Schema Registry, then you are getting Unknown magic byte!
error from that Converter. Instead, you'll need to instead use the regular JSONConverter
class (not prefixed with io.confluent
, but rather org.apache.kafka
). But as stated, value.converter.schemas.enable
must be true
.
More info - Converter Deep Dive
Upvotes: 2