BenjaminC
BenjaminC

Reputation: 13

Unable to extractFieldwith SMT transformation in Oracle database

I'm not able to perform SMT transformation "ExtractField" in order to extract field from key struct to a simple long value with an Oracle database. It works fine with a Postgres database.

I tried to use "ReplaceField" SMT to rename the key and it works fine. I suspect a problem in the class "org.apache.kafka.connect.transforms.ExtractField" on schema handling to get the field. Schema handling seems to work differently between "ReplaceField" and "ExtractField".

Oracle database version: Oracle Database 19c Enterprise Edition Release 19.0.0.0.0 - Production Version 19.8.0.0.0 Debezium connect: 1.6 Kafka version: 2.7.0 Instanclient basic (Oracle client and drivers): 21.3.0.0.0

I got an "Unknown field ID_MYTABLE":

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.IllegalArgumentException: Unknown field: ID_MYTABLE org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65) at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more

Here is my configuration of my Kafka connector:

{
  "name": "oracle-connector",  
  "config": {   
    "connector.class": "io.debezium.connector.oracle.OracleConnector", 
    "tasks.max": "1", 
    "database.server.name": "serverName", 
    "database.user": "c##dbzuser", 
    "database.password": "dbz", 
    "database.url": "jdbc:oracle:thin:...", 
    "database.dbname": "dbName", 
    "database.pdb.name": "PDBName", 
    "database.connection.adapter": "logminer", 
    "database.history.kafka.bootstrap.servers": "kafka:9092", 
    "database.history.kafka.topic": "schema-changes.data", 
    "schema.include.list": "mySchema", 
    "table.include.list": "mySchema.myTable", 
    "log.mining.strategy": "online_catalog", 
    "snapshot.mode": "initial", 
    "key.converter": "org.apache.kafka.connect.json.JsonConverter", 
    "key.converter.schemas.enable": "false", 
    "value.converter": "io.confluent.connect.avro.AvroConverter", 
    "value.converter.schemas.enable": "true", 
    "value.converter.schema.registry.url": "http://schema-registry:8081", 
    "transforms": "unwrap,route,extractField",
    "transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Key", 
    "transforms.extractField.field": "ID_MYTABLE", 
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", 
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", 
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", 
    "transforms.route.replacement": "$1_$2_$3" 
  } 
}

Upvotes: 0

Views: 671

Answers (1)

dynamitem
dynamitem

Reputation: 1669

By default, when you configure SMTs for any connector, including Debezium, transformation is applied to every record that the connector emits. That includes change event messages that might not have the retrieved data and might not have the necessary fields. To solve this, you need to apply your SMTs selectively to only a specific subset of change event messages generated by Debezium using SMT predicates.

The official documentation is located here.

In your specific case, you could apply the SMT only to the output topic for that specific database table it would look something like this:

# Create a predicate that matches your output 
predicates: topicNameMatch
predicates.topicNameMatch.type: org.apache.kafka.connect.transforms.predicates.TopicNameMatches
predicates.topicNameMatch.pattern: *output topic name goes here*

# Your logic to extract the field from the key
transforms.extractField.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractField.field: ID_MYTABLE

# This references the predicate above
transforms.extractField.predicate: topicNameMatch

There are other predicates located in the documentation listed above if the topic name matching doesn't work for you.

Upvotes: 0

Related Questions