Jeffery-the-newbie
Jeffery-the-newbie

Reputation: 41

Error configuring an instance of JdbcSinkConnectorConfig with Postgres-sink-connector

I want to run CDC (Oracle - Kafka - Postgres) using Debezium. I set up kafka.yaml

services:
  oracle:
    image: gvenzl/oracle-xe:21.3.0
    container_name: oracle
    ports:
      - "1522:1521"
    environment:
      ORACLE_PASSWORD: "sys_password"
    volumes:
      - ./oradata:/opt/oracle/oradata
      - ./ora-setup-scripts:/docker-entrypoint-initdb.d

  kafka:
    image: confluentinc/cp-kafka:7.8.0
    container_name: kafka
    ports:
      - 9092:9092
      - 9093:9093
    volumes:
      - ./kafka-data:/var/lib/kafka/data
      - ./kafka-logs:/var/lib/kafka/logs
      - ./kafka-setup/entrypoint.sh:/usr/bin/entrypoint.sh
      - ./kafka-setup/register-connectors.sh:/usr/bin/register-connectors.sh
      - ./connectors:/kafka/connectors
    entrypoint: ["/bin/bash", "/usr/bin/entrypoint.sh"]
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://localhost:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 1
      KAFKA_SCHEMA_REGISTRY_URL: "schema-registry:8081"
      # NEEDED TO BE REMOVED IF CLASTER IS ALREADY CREATED
      CLUSTER_ID: 731692aa-ecae-4ef9-9432-e3fbc3b7e0d9

  kafka-ui:
    image: provectuslabs/kafka-ui:v0.7.2
    container_name: kafka-ui
    ports:
      - "8085:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

  schema-registry:
   image: confluentinc/cp-schema-registry:7.8.0
   hostname: schema-registry
   container_name: schema-registry
   depends_on:
     - kafka
   ports:
     - 8081:8081
   environment:
     SCHEMA_REGISTRY_HOST_NAME: schema-registry
     SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
     SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 # http://schema-registry:8081,http://localhost:8081
 
  debezium-connect:
    container_name: debezium-connect
    build: .
    image: debezium-connect-with-avro-connectors:latest
    ports:
      - 8083:8083
    depends_on:
      - kafka
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      CONNECT_REST_ADVERTISED_HOST_NAME: debezium-connect
      CONNECT_PLUGIN_PATH: /kafka/connect,/kafka/connect/plugins/confluent
      CONNECTOR_CONFIG_DIR: /kafka/connectors
      CONNECT_REST_PORT: 8083

    volumes:
      - ./connectors:/kafka/connectors
      - ./kafka-setup/register-connectors.sh:/usr/bin/register-connectors.sh
      - ./connect-data:/var/lib/kafka-connect

  debezium-ui:
    container_name: debezium-ui
    image: quay.io/debezium/debezium-ui:2.5
    ports:
      - 8087:8080
    environment:
      KAFKA_CONNECT_URIS: http://debezium-connect:8083
    depends_on:
      - debezium-connect

  postgres:
    image: postgres:14
    container_name: postgres
    ports:
      - 5433:5432
    environment:
      POSTGRES_USER: postgres_user
      POSTGRES_PASSWORD: postgres_password
      POSTGRES_DB: postgres_cdc
    volumes:
      - ./postgres-data:/var/lib/postgresql/data

I also set up oracle-connector.sh (which worked fine)

{
  "name": "oracle-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "database.hostname": "oracle",
    "database.port": "1521",
    "database.user": "c##dbzuser",
    "database.password": "debezium_common_user_password",
    "database.dbname": "XE",
    "database.pdb.name": "XEPDB1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.oracle",
    "snapshot.mode": "initial",
    "topic.prefix": "oracle",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    
    "value.converter.schema.registry.url": "http://your-schema-registry:8081",
    "key.converter.schema.registry.url": "http://your-schema-registry:8081",

    "log.mining.strategy": "online_catalog",
    "archive.destination.name": "LOG_ARCHIVE_DEST_1",
    "schema.history.internal.kafka.topic": "debezium-schema-history",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",

    "transforms": "topicRename",
    "transforms.topicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.topicRename.regex": "oracle\\.(.*)",
    "transforms.topicRename.replacement": "$1"

  }
}

And postgres-connector.json

{
  "name": "postgres-sink-connector",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "DEBEZIUM.TEST1",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres_cdc",
    "connection.user": "postgres_user",
    "connection.password": "postgres_password",
    "insert.mode": "upsert",
    "auto.create": "true",
    "auto.evolve": "true",
    "pk.mode": "record_value",
    "pk.fields": "id",

    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    
    "value.converter.schema.registry.url": "http://your-schema-registry:8081",
    "key.converter.schema.registry.url": "http://your-schema-registry:8081"
  }
}

So, my Postgres sink connector doesn't work. When I try to register it, I get

"org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of JdbcSinkConnectorConfig; check the logs for details\n\tat io.debezium.connector.jdbc.JdbcSinkConnectorConfig.validate(JdbcSinkConnectorConfig.java:406)\n\tat io.debezium.connector.jdbc.JdbcSinkConnectorTask.start(JdbcSinkConnectorTask.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:324)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:176)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"

There are also my dockerfile which I used to build a Debezium image with some plugins

FROM quay.io/debezium/connect:3.0.7.Final

# Create the plugins directory
RUN mkdir -p /kafka/connect/plugins/confluent


# Download the necessary Confluent libraries
RUN curl -L https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/7.0.1/kafka-connect-avro-converter-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-connect-avro-converter-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/7.0.1/kafka-connect-avro-data-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-connect-avro-data-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/7.0.1/kafka-avro-serializer-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-avro-serializer-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/7.0.1/kafka-schema-serializer-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-schema-serializer-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/kafka-schema-converter/7.0.1/kafka-schema-converter-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-schema-converter-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.1/kafka-schema-registry-client-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-schema-registry-client-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/common-config/7.0.1/common-config-7.0.1.jar -o /kafka/connect/plugins/confluent/common-config-7.0.1.jar && \
    curl -L https://packages.confluent.io/maven/io/confluent/common-utils/7.0.1/common-utils-7.0.1.jar -o /kafka/connect/plugins/confluent/common-utils-7.0.1.jar

# Download other general libraries required for Avro and Kafka support
RUN curl -L https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar -o /kafka/connect/plugins/confluent/avro-1.10.2.jar && \
    curl -L https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar -o /kafka/connect/plugins/confluent/guava-30.1.1-jre.jar && \
    curl -L https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar -o /kafka/connect/plugins/confluent/commons-compress-1.21.jar && \
    curl -L https://repo1.maven.org/maven2/com/google/code/findbugs/annotations/3.0.1/annotations-3.0.1.jar -o /kafka/connect/plugins/confluent/annotations-3.0.1.jar && \
    curl -L https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar -o /kafka/connect/plugins/confluent/slf4j-api-1.7.30.jar && \
    curl -L https://repo1.maven.org/maven2/org/snakeyaml/snakeyaml/1.29/snakeyaml-1.29.jar -o /kafka/connect/plugins/confluent/snakeyaml-1.29.jar && \
    curl -L https://repo1.maven.org/maven2/org/swagger/swagger-annotations/1.6.0/swagger-annotations-1.6.0.jar -o /kafka/connect/plugins/confluent/swagger-annotations-1.6.0.jar && \
    curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.12.4/jackson-databind-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-databind-2.12.4.jar && \
    curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.12.4/jackson-core-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-core-2.12.4.jar && \
    curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.12.4/jackson-annotations-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-annotations-2.12.4.jar && \
    curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-csv/2.12.4/jackson-dataformat-csv-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-dataformat-csv-2.12.4.jar

I tried to create the same tables in Oracle and Postgres with appropriate datatypes (which seems odd considering the fact that I set "auto.create and "auto.evolve" to true). I think it might be because schema-registry (especially, its conversion types) but I tried JSON, as well as Avro. I also tried different version of Posgres which obviously didn't help. The question is how to register Postgres sink connector. Thank you!

Upvotes: 0

Views: 32

Answers (0)

Related Questions