Ravi
Ravi

Reputation: 8209

Debezium Single Message filter

I'm using the following Debezuim configuration and trying to configure SMT as follows. I'm following the https://debezium.io/documentation/reference/stable/transformations/filtering.html to have the setup done. I also have the required jars in the debezium-connector-mysql folder. How can I solve the below error?

{
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "driver.connectionTimeZone": "UTC",
    "tasks.max": "1",
    "database.history.kafka.topic": "dbhistory.dev_demo_db_log",
    "transforms": "filter, changes",
    "include.schema.changes": "true",
    "topic.prefix": "CDC_DEMO",
    "decimal.handling.mode": "double",
    "schema.history.internal.kafka.topic": "schemahistory.dev_demo_db_log",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.user": "root",
    "database.server.id": "2342",
    "database.history.kafka.bootstrap.servers": "localhost:9092",
    "database.server.name": "dev_demo_db",
    "schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
    "transforms.changes.type": "io.debezium.transforms.ExtractChangedRecordState",
    "database.port": "3306",
    "key.converter.schemas.enable": "false",
    "database.hostname": "localhost",
    "database.password": "root",
    "value.converter.schemas.enable": "false",
    "name": "debezium-demo-connector",
    "table.include.list": "debezium_demo.User",
    "database.include.list": "debezium_demo",
    "transforms.changes.header.changed.name": "Changed",
    "transforms.filter.type": "io.debezium.transforms.Filter",
    "transforms.filter.language": "jsr223.groovy",
    "transforms.filter.condition": "value.op == 'u'",
    "snapshot.mode": "schema_only"
}

I keep getting the following error

Caused by: javax.script.ScriptException: org.apache.kafka.connect.errors.DataException: op is not a valid field name
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:334)
    at org.codehaus.groovy.jsr223.GroovyCompiledScript.eval(GroovyCompiledScript.java:72)
    at java.scripting/javax.script.CompiledScript.eval(CompiledScript.java:93)
    at io.debezium.transforms.scripting.Jsr223Engine.eval(Jsr223Engine.java:107)
    ... 20 more
Caused by: org.apache.kafka.connect.errors.DataException: op is not a valid field name
    at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
    at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
    at org.codehaus.groovy.vmplugin.v8.IndyInterface.fromCache(IndyInterface.java:321)
    at Script13.run(Script13.groovy:1)
    at org.codehaus.groovy.jsr223.GroovyScriptEngineImpl.eval(GroovyScriptEngineImpl.java:331)
    ... 23 more

Upvotes: 1

Views: 273

Answers (0)

Related Questions