mjahr
mjahr

Reputation: 159

How to use Single Message Transforms with Kafka Connect JDBC Source Connector and multiple tables?

I want to set the mesage key when importing tables with the Kafka Connect Source JDBC Connector.

How can Single Message Transforms (SMT) in Kafka Connect/Source be targeted to the right fields when having multiple tables defined to be read from JDBC connector? SMTs need a column name which might differ when having multiple tables.

I don't see a way to filter SMT definitions based on table name or similar. The code sample below works fine since it is only one table.

But what to do if you have different tables, e.g. User, Order, Product ?

"table.whitelist" : "User"
"transforms":"createKey,extract",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
                "transforms.createKey.fields":"user_id",
"transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField\$Key",
"transforms.extract.field":"user_id",

When a worker task with that configuration meets a table without that user_id field, it crashes and remains in status FAILED

org.apache.kafka.connect.errors.ConnectException: 
Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more

This is plausible since there is no possibility to define by table or target optic, or is it? I would expect a capability to restrict transforms to a given table or topic, e.g. something like

transforms.<topic-name>.createKey.type

Am I missing something or is it a Connect restriction?

Upvotes: 2

Views: 1490

Answers (1)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39810

It is not possible to apply SMTs only to specific topics because this is a connector level configuration meaning that it is applied to every processed message.

I would recommend you to create distinct connectors for every topic so that you can apply SMTs only to a subset of the topics.

Upvotes: 5

Related Questions