Reputation: 41
I want to run CDC (Oracle - Kafka - Postgres) using Debezium. I set up kafka.yaml
services:
oracle:
image: gvenzl/oracle-xe:21.3.0
container_name: oracle
ports:
- "1522:1521"
environment:
ORACLE_PASSWORD: "sys_password"
volumes:
- ./oradata:/opt/oracle/oradata
- ./ora-setup-scripts:/docker-entrypoint-initdb.d
kafka:
image: confluentinc/cp-kafka:7.8.0
container_name: kafka
ports:
- 9092:9092
- 9093:9093
volumes:
- ./kafka-data:/var/lib/kafka/data
- ./kafka-logs:/var/lib/kafka/logs
- ./kafka-setup/entrypoint.sh:/usr/bin/entrypoint.sh
- ./kafka-setup/register-connectors.sh:/usr/bin/register-connectors.sh
- ./connectors:/kafka/connectors
entrypoint: ["/bin/bash", "/usr/bin/entrypoint.sh"]
environment:
KAFKA_NODE_ID: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://localhost:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 1
KAFKA_SCHEMA_REGISTRY_URL: "schema-registry:8081"
# NEEDED TO BE REMOVED IF CLASTER IS ALREADY CREATED
CLUSTER_ID: 731692aa-ecae-4ef9-9432-e3fbc3b7e0d9
kafka-ui:
image: provectuslabs/kafka-ui:v0.7.2
container_name: kafka-ui
ports:
- "8085:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
depends_on:
- kafka
schema-registry:
image: confluentinc/cp-schema-registry:7.8.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 # http://schema-registry:8081,http://localhost:8081
debezium-connect:
container_name: debezium-connect
build: .
image: debezium-connect-with-avro-connectors:latest
ports:
- 8083:8083
depends_on:
- kafka
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
CONNECT_REST_ADVERTISED_HOST_NAME: debezium-connect
CONNECT_PLUGIN_PATH: /kafka/connect,/kafka/connect/plugins/confluent
CONNECTOR_CONFIG_DIR: /kafka/connectors
CONNECT_REST_PORT: 8083
volumes:
- ./connectors:/kafka/connectors
- ./kafka-setup/register-connectors.sh:/usr/bin/register-connectors.sh
- ./connect-data:/var/lib/kafka-connect
debezium-ui:
container_name: debezium-ui
image: quay.io/debezium/debezium-ui:2.5
ports:
- 8087:8080
environment:
KAFKA_CONNECT_URIS: http://debezium-connect:8083
depends_on:
- debezium-connect
postgres:
image: postgres:14
container_name: postgres
ports:
- 5433:5432
environment:
POSTGRES_USER: postgres_user
POSTGRES_PASSWORD: postgres_password
POSTGRES_DB: postgres_cdc
volumes:
- ./postgres-data:/var/lib/postgresql/data
I also set up oracle-connector.sh (which worked fine)
{
"name": "oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"database.hostname": "oracle",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "debezium_common_user_password",
"database.dbname": "XE",
"database.pdb.name": "XEPDB1",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.oracle",
"snapshot.mode": "initial",
"topic.prefix": "oracle",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://your-schema-registry:8081",
"key.converter.schema.registry.url": "http://your-schema-registry:8081",
"log.mining.strategy": "online_catalog",
"archive.destination.name": "LOG_ARCHIVE_DEST_1",
"schema.history.internal.kafka.topic": "debezium-schema-history",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"transforms": "topicRename",
"transforms.topicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRename.regex": "oracle\\.(.*)",
"transforms.topicRename.replacement": "$1"
}
}
And postgres-connector.json
{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DEBEZIUM.TEST1",
"connection.url": "jdbc:postgresql://postgres:5432/postgres_cdc",
"connection.user": "postgres_user",
"connection.password": "postgres_password",
"insert.mode": "upsert",
"auto.create": "true",
"auto.evolve": "true",
"pk.mode": "record_value",
"pk.fields": "id",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://your-schema-registry:8081",
"key.converter.schema.registry.url": "http://your-schema-registry:8081"
}
}
So, my Postgres sink connector doesn't work. When I try to register it, I get
"org.apache.kafka.connect.errors.ConnectException: Error configuring an instance of JdbcSinkConnectorConfig; check the logs for details\n\tat io.debezium.connector.jdbc.JdbcSinkConnectorConfig.validate(JdbcSinkConnectorConfig.java:406)\n\tat io.debezium.connector.jdbc.JdbcSinkConnectorTask.start(JdbcSinkConnectorTask.java:97)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:324)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doStart(WorkerTask.java:176)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:281)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:238)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"
There are also my dockerfile which I used to build a Debezium image with some plugins
FROM quay.io/debezium/connect:3.0.7.Final
# Create the plugins directory
RUN mkdir -p /kafka/connect/plugins/confluent
# Download the necessary Confluent libraries
RUN curl -L https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/7.0.1/kafka-connect-avro-converter-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-connect-avro-converter-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-data/7.0.1/kafka-connect-avro-data-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-connect-avro-data-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/kafka-avro-serializer/7.0.1/kafka-avro-serializer-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-avro-serializer-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/7.0.1/kafka-schema-serializer-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-schema-serializer-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/kafka-schema-converter/7.0.1/kafka-schema-converter-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-schema-converter-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.0.1/kafka-schema-registry-client-7.0.1.jar -o /kafka/connect/plugins/confluent/kafka-schema-registry-client-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/common-config/7.0.1/common-config-7.0.1.jar -o /kafka/connect/plugins/confluent/common-config-7.0.1.jar && \
curl -L https://packages.confluent.io/maven/io/confluent/common-utils/7.0.1/common-utils-7.0.1.jar -o /kafka/connect/plugins/confluent/common-utils-7.0.1.jar
# Download other general libraries required for Avro and Kafka support
RUN curl -L https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar -o /kafka/connect/plugins/confluent/avro-1.10.2.jar && \
curl -L https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar -o /kafka/connect/plugins/confluent/guava-30.1.1-jre.jar && \
curl -L https://repo1.maven.org/maven2/org/apache/commons/commons-compress/1.21/commons-compress-1.21.jar -o /kafka/connect/plugins/confluent/commons-compress-1.21.jar && \
curl -L https://repo1.maven.org/maven2/com/google/code/findbugs/annotations/3.0.1/annotations-3.0.1.jar -o /kafka/connect/plugins/confluent/annotations-3.0.1.jar && \
curl -L https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar -o /kafka/connect/plugins/confluent/slf4j-api-1.7.30.jar && \
curl -L https://repo1.maven.org/maven2/org/snakeyaml/snakeyaml/1.29/snakeyaml-1.29.jar -o /kafka/connect/plugins/confluent/snakeyaml-1.29.jar && \
curl -L https://repo1.maven.org/maven2/org/swagger/swagger-annotations/1.6.0/swagger-annotations-1.6.0.jar -o /kafka/connect/plugins/confluent/swagger-annotations-1.6.0.jar && \
curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.12.4/jackson-databind-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-databind-2.12.4.jar && \
curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-core/2.12.4/jackson-core-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-core-2.12.4.jar && \
curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.12.4/jackson-annotations-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-annotations-2.12.4.jar && \
curl -L https://repo1.maven.org/maven2/com/fasterxml/jackson/dataformat/jackson-dataformat-csv/2.12.4/jackson-dataformat-csv-2.12.4.jar -o /kafka/connect/plugins/confluent/jackson-dataformat-csv-2.12.4.jar
I tried to create the same tables in Oracle and Postgres with appropriate datatypes (which seems odd considering the fact that I set "auto.create
and "auto.evolve"
to true).
I think it might be because schema-registry (especially, its conversion types) but I tried JSON, as well as Avro. I also tried different version of Posgres which obviously didn't help.
The question is how to register Postgres sink connector. Thank you!
Upvotes: 0
Views: 32