prashant.kr.mod
prashant.kr.mod

Reputation: 1702

Debezium Oracle connector CDC with table having multiple partitions

My requirement is to have an Oracle source connector with Postgre JdbcSink Connector.
My table CUSTOMER has 3 partitions, [p0,p1,pmax].

I want to do CDC for P0. So far not able to white list exactly CUSTOMER:P0.

Following is the configuration for OracleSourceConnector:

{
  "name": "inventory-connector",
  "config": {
    "connector.class" : "io.debezium.connector.oracle.OracleConnector",
    "topic.prefix" : "server1",
    "tasks.max" : "1",
    "database.server.name" : "ORCLPDB",
    "database.hostname" : "192.168.0.103",
    "database.port" : "1521",
    "database.user" : "C##DBZUSER",
    "database.password" : "oracle",
    "database.dbname" : "XE",
    "database.pdb.name" : "XEPDB1",
    "database.connection.adapter" : "logminer",
    "database.history.kafka.bootstrap.servers" : "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "table.include.list": "DEBEZIUM\\.PLAYERS",

    "transforms": "routePartition,unwrap,DATEPARTITION",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",

    "transforms.routePartition.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.routePartition.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.routePartition.replacement": "$3",


    "transforms.DATEPARTITION.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.DATEPARTITION.target.type":"string",
    "transforms.DATEPARTITION.field":"DATEPARTITION",
    "transforms.DATEPARTITION.format":"dd-MM-yyyy" ,
    "time.precision.mode":"connect"

  }
}

Following is the configuration for PostgreSinkConnector:

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "PLAYERS",
        "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
        "transforms": "unwrap,routePartition",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "true",
        "auto.create": "true",
        "insert.mode": "upsert",
        "delete.enabled": "true",
        "pk.mode": "record_key",
        "table.name.format": "PLAYERS",
        "transforms.routePartition.type": "org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.routePartition.regex": ".*",
        "transforms.routePartition.replacement": "PLAYERS-0"
    }
}

I can find, partitioned table cdc with debezium for Postgres db here:

Partitioned PostgreSQL tables: https://debezium.io/documentation/reference/stable/transformations/topic-routing.html

But could not find any such documentation for Oracle debezium connector.

Any help will be greatly appreciated!!!

Upvotes: 0

Views: 159

Answers (0)

Related Questions