Reputation: 1474
I'm using confluent's kafka connect to pipe data into a s3 bucket. Ideally partitioned based on a key. Since the existing FieldPartitioner only works for Avro schema records and not for general stringnified JSON texts. I thought i'd write my own connector.
Here's the class:
package com.package.kafka.connect;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.FieldPartitioner;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class JsonFieldPartitioner<T> extends DefaultPartitioner<T> {
private static final Logger log = LoggerFactory.getLogger(FieldPartitioner.class);
private List<String> fieldNames;
private List<String> keys;
public void configure(Map<String, Object> config){
fieldNames = (List<String>) config.get("partition.field.name");
String field = fieldNames.get(0);
keys = new ArrayList<String>(Arrays.asList(field.split(".")));
}
public String encodePartition(SinkRecord sinkRecord){
String value = sinkRecord.value().toString();
JsonElement rootElement = new JsonParser().parse(value);
JsonElement element = rootElement;
for(String key : keys){
log.info("key: "+ key);
try{
element = element.getAsJsonObject().get(key);
}catch(Exception e){
log.error("encountered error getting key: " + key);
throw new ConfigException("Key element not found" + key);
}
}
String fieldValue = "";
try{
fieldValue = element.getAsString();
}catch(Exception e){
log.error("encountered error getting key value ");
throw new ConfigException("Key element not found");
}
return fieldValue;
}
public List<T> partitionFields() {
if (partitionFields == null) {
partitionFields = newSchemaGenerator(config).newPartitionFields(
Utils.join(fieldNames, ",")
);
}
return partitionFields;
}
}
When I build it and try to run kafka connect I get an error
java.lang.NullPointerException
at io.confluent.connect.storage.partitioner.PartitionerConfig.classNameEquals(PartitionerConfig.java:269)
at io.confluent.connect.storage.partitioner.PartitionerConfig.access$000(PartitionerConfig.java:32)
at io.confluent.connect.storage.partitioner.PartitionerConfig$PartitionerClassDependentsRecommender.visible(
from looking at Packaging a custom Java `partitioner.class` plugin for Kafka Connect in Confluent 4.1 + Kafka 1.1?
I've tried to put the jar file built by this in the kafka-connect-storage-common
directory in $CONFLUENT_HOME but I still get the same error.
The gradle file to build the jar is here
id 'java'
}
group 'JsonFieldPartitioner'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
dependencies {
compile group: 'org.apache.kafka', name: 'connect-api', version: '2.3.0'
compile fileTree( dir:'/Users/myuser/confluent-5.3.0/share/java/kafka-connect-storage-common', include: ['*.jar'])
compile group: 'joda-time', name: 'joda-time', version: '2.10.3'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
and in the s3 connector properties file i just reference the class by com.package.kafka.connect.JsonFieldPartitioner
If anyone has had success building custom partitioners, any help would be greatly appreciated.
Upvotes: 5
Views: 3074
Reputation: 11
while preparing kafka connect image :- we could do
confluent-hub install --no-prompt /tmp/connect-fieldandtime-partitioner-1.2.0.zip
here the zip is custom partitioner then why do we need to place the JAR in lib folder of Service
Upvotes: 0
Reputation: 11
Copy your custom jar file under plugin folder Example /usr/local/share/kafka/plugins/confluentinc-kafka-connect-s3/
Upvotes: 1