Jehan Bruggeman
Jehan Bruggeman

Reputation: 353

Using a custom converter with Kafka Connect?

I'm trying to use a custom converter with Kafka Connect and I cannot seem to get it right. I'm hoping someone has experience with this and could help me figure it out !

Initial situation

What happens ?

When the connectors start, they correctly load the jars and find the custom converter. Indeed, this is what I see in the logs :

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)

I then POST a JSON config to one of the connector nodes to create my connector :

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}

And receive the following reply :

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

What am I missing ?

If I try running Kafka Connect stadnalone, the error message is the same.

Has anybody faced this already ? What am I missing ?

Upvotes: 3

Views: 8548

Answers (2)

Bert
Bert

Reputation: 930

Original answer:

To make the answer provided by Jehan Bruggeman more concrete: If you use the standard Debezium container image (quay.io/debezium/connect) and the PostgreSQL connector, then map your JAR into the folder /kafka/connect/debezium-connector-postgres, for instance like this Docker compose snippet:

  debezium:
    container_name: debezium
    image: quay.io/debezium/connect:2.1.1.Final
    ports:
      - "8083:8083"
    environment:
      :
      :
    links:
      - "kafka"
      - "db"
    configs:
      - source: dbz-custom-smt-jar
        target: /kafka/connect/debezium-connector-postgres/custom-smt-0.0.1-SNAPSHOT.jar

configs:
  dbz-custom-smt-jar:
     file: ./dbz-extra-classes/custom-smt-0.0.1-SNAPSHOT.jar

No need to adjust the plugin.path property.

Updated answer: The Kafka issue KAFKA-6007 is resolved, so no need to add the SMT classes under the connector anymore. It's now possible to add the SMT classes as a connector on its own.

If you use the standard Debezium container image (quay.io/debezium/connect), then map your JAR/JARs into a folder like /kafka/connect/dbz-custom-smt, for instance like this Docker compose snippet:

  debezium:
    container_name: debezium
    image: quay.io/debezium/connect:2.1.1.Final
    ports:
      - "8083:8083"
    environment:
      :
      :
    links:
      - "kafka"
      - "db"
    configs:
      - source: dbz-custom-smt
        target: /kafka/connect/dbz-custom-smt

configs:
  dbz-custom-smt:
     file: ./dbz-extra-classes

I've created a small GitHub project with a working example of each approach. It was created to reproduce KAFKA-6007, so it has a 'working' an 'failing' scenario, but it appeared that nowadays both scenarios are 'working'.

Upvotes: 1

Jehan Bruggeman
Jehan Bruggeman

Reputation: 353

Ok, I found out the solution thanks to Philip Schmitt on the Kafka Users mailing list.

He mentioned this issue: https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007 , which is indeed the problem I am facing.

To quote him:

To test this, I simply copied my SMT jar to the folder of the connector I was using and adjusted the plugin.path property.

Indeed, I got rid of this error by putting the converter in the connector's folder.

I also tried something else: create a custom connector and use that custom connector with the custom converter, both loaded as plugins. It also works.

Summary: converters are loaded by the connector. If your connector is a plugin, your converter should be as well. If you connector is not a plugin (bundled with your kafka connect distrib), your converter should not be either.

Upvotes: 10

Related Questions