Reputation: 694
I'm having a problem to add jar to the Kafka connect class path, while trying to read messages from Kafka to S3.
The goal is to write messages in partitions based on timestamp, which is part of the Key in the Kafka message.
To make the story short I have to provide custom timestamp extractor. Following the documentation here created a class that implements TimestampExtractor
interface and added a JAR location to the plugin.path
property.
The problem is that when I start the connect, the class is not found. Somehow the jar is not in the classpath and I'm getting
org.apache.kafka.common.config.ConfigException: Invalid timestamp extractor: partitioner.SpotadDateTimeExtractor
Additional data:
version: Confluent 4.0.0
connect: Connect Standalone
Starting command:
sudo /home/ubuntu/confluent-4.0.0/bin/connect-standalone \
/home/ubuntu/confluent-4.0.0/etc/kafka/connect-standalone.properties \
/home/ubuntu/confluent-4.0.0/etc/kafka-connect-s3/quickstart-s3.properties
Aprreaciate any help.
Upvotes: 4
Views: 1580
Reputation: 1993
To make custom timestamp extractor classes available to your S3 connector you will need the following:
Add the jar with your custom classes along with the other connector's dependencies. Example:
Save under ./share/java/kafka-connect-s3
if you want this to be
available only in S3 connector, or in
./share/java/kafka-connect-storage-common
to make it available to
all storage sink connectors (currently S3 and HDFS connectors).
io.confluent.connect.storage.partitioner.TimestampExtractor
interface. Use the fully qualified class name when you set the timestamp.extractor
property in your connector's configuration and of course make sure it matches the package you've defined and packaged in your jar. For instance:
timestamp.extractor=me.connectors.MyTimestampExtractor
Finally, you'd follow a similar process to make a custom partitioner available to your connector.
Upvotes: 5