willy
willy

Reputation: 31

Custom SMT kafka connect mongodb trasfom _id from string to objectId

I am developing a custom smt for kafka connect .

The goal is trasform _id in string format from debezium to objecId mongo . Here is my code :

public class StringToObjectIdV2 <R extends ConnectRecord<R>> implements Transformation<R>{
    public static final String CODEC_SETTING = "codec.setting";
    CodecRegistry codecRegistry = CodecRegistries.fromRegistries(
            MongoClientSettings.getDefaultCodecRegistry(),
            CodecRegistries.fromProviders(PojoCodecProvider.builder().automatic(true).build())
    );

    @Override
    public R apply(R record) {
        if (record.value() instanceof Map) {
            Map<String, Object> valueMap = (Map<String, Object>) record.value();
            // Assuming the _id field is present and is a String
            String idString = (String) valueMap.get("_id");
            try {
                if (idString != null && ObjectId.isValid(idString)) {
                    // Convert the string to ObjectId
                    ObjectId objectId = new ObjectId(idString);
                    // Replace the String _id with the ObjectId
                    valueMap.put("_id", objectId);
                }
            }catch (Exception e) {
                e.getCause().printStackTrace();
            }

        }
        // Return the transformed record
        return record;
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
                .define(CODEC_SETTING, ConfigDef.Type.STRING, "default_codec",
                        ConfigDef.Importance.MEDIUM, "The codec to use for BSON types");
    }


    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
        String codecSetting = (String) configs.get(CODEC_SETTING);

        if ("customCodec".equals(codecSetting)) {
            codecRegistry = CodecRegistries.fromRegistries(
                    CodecRegistries.fromCodecs(new ObjectIdCodecCustom())
            );
        }
    }

    public CodecRegistry getCodecRegistry() {
        return codecRegistry;
    }
}

When the kafka connect run the code, i get always this error :

(com.mongodb.kafka.connect.sink.MongoProcessedSinkRecordData)
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class org.bson.types.ObjectId.
        at org.bson.internal.CodecCache.lambda$getOrThrow$1(CodecCache.java:52)

All dependency are includes in my jar .

Any suggest ?

Thanks

Luca

Upvotes: 0

Views: 26

Answers (0)

Related Questions