Arkadiy Verman
Arkadiy Verman

Reputation: 694

Kafka Connect with custom timestamp.extractor

I'm having a problem to add jar to the Kafka connect class path, while trying to read messages from Kafka to S3.

The goal is to write messages in partitions based on timestamp, which is part of the Key in the Kafka message.

To make the story short I have to provide custom timestamp extractor. Following the documentation here created a class that implements TimestampExtractor interface and added a JAR location to the plugin.path property.

The problem is that when I start the connect, the class is not found. Somehow the jar is not in the classpath and I'm getting

org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor

Additional data:

version: Confluent 4.0.0

connect: Connect Standalone

Starting command:

sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \ /home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \ /home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties

Aprreaciate any help.

Upvotes: 4

Views: 1580

Answers (1)

Konstantine Karantasis
Konstantine Karantasis

Reputation: 1993

To make custom timestamp extractor classes available to your S3 connector you will need the following:

  • Add the jar with your custom classes along with the other connector's dependencies. Example:

    Save under ./share/java/kafka-connect-s3 if you want this to be available only in S3 connector, or in ./share/java/kafka-connect-storage-common to make it available to all storage sink connectors (currently S3 and HDFS connectors).

  • Make sure your custom class implements the io.confluent.connect.storage.partitioner.TimestampExtractor interface.
  • Use the fully qualified class name when you set the timestamp.extractor property in your connector's configuration and of course make sure it matches the package you've defined and packaged in your jar. For instance:

    timestamp.extractor=me.connectors.MyTimestampExtractor

Finally, you'd follow a similar process to make a custom partitioner available to your connector.

Upvotes: 5

Related Questions