Reputation: 31
I am developing a custom smt for kafka connect .
The goal is trasform _id in string format from debezium to objecId mongo . Here is my code :
public class StringToObjectIdV2 <R extends ConnectRecord<R>> implements Transformation<R>{
public static final String CODEC_SETTING = "codec.setting";
CodecRegistry codecRegistry = CodecRegistries.fromRegistries(
MongoClientSettings.getDefaultCodecRegistry(),
CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
);
@Override
public R apply(R record) {
if (record.value() instanceof Map) {
Map<String, Object> valueMap = (Map<String, Object>) record.value();
// Assuming the _id field is present and is a String
String idString = (String) valueMap.get("_id");
try {
if (idString != null && ObjectId.isValid(idString)) {
// Convert the string to ObjectId
ObjectId objectId = new ObjectId(idString);
// Replace the String _id with the ObjectId
valueMap.put("_id", objectId);
}
}catch (Exception e) {
e.getCause().printStackTrace();
}
}
// Return the transformed record
return record;
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define(CODEC_SETTING, ConfigDef.Type.STRING, "default_codec",
ConfigDef.Importance.MEDIUM, "The codec to use for BSON types");
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
String codecSetting = (String) configs.get(CODEC_SETTING);
if ("customCodec".equals(codecSetting)) {
codecRegistry = CodecRegistries.fromRegistries(
CodecRegistries.fromCodecs(new ObjectIdCodecCustom())
);
}
}
public CodecRegistry getCodecRegistry() {
return codecRegistry;
}
}
When the kafka connect run the code, i get always this error :
(com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class org.bson.types.ObjectId.
at org.bson.internal.CodecCache.lambda$getOrThrow$1(CodecCache.java:52)
All dependency are includes in my jar .
Any suggest ?
Thanks
Luca
Upvotes: 0
Views: 26