Naman Kumar
Naman Kumar

Reputation: 85

How to modify/update to the data before sending it to downstream

I have a topic which has data in the format

{
 before: {...},
 after: {...},
 source: {...},
 op: 'u'
}

The data was produced by Debezium. I want to send the data to SQL Server db table, so I selected JDBC Sink Connector. I need to process the data before sending it to downstream.

Logic that needs to be applied:

  1. if op = 'u' or op = 'c' or op = 'r' // update or insert or snapshot

    select all the fields present in 'after' and perform upsert to downstream.

  2. if op = 'd' // delete

    select all the fields present in 'before' + add a field IsActive=false and perform upsert to downstream.

How can I achieve this?

Upvotes: 0

Views: 774

Answers (2)

Naman Kumar
Naman Kumar

Reputation: 85

I was able do achieve this using custom transform in sink jdbc connector. I extracted the after field and op field and applied the logic. There is no direct way to update the record i.e. there is no method to setSchema and setValue. So i have used reflection to update schema and value.

The below code snippets:

private final ExtractField<R> afterDelegate = new ExtractField.Value<R>();
    private final ExtractField<R> beforeDelegate = new ExtractField.Value<R>();
    private final ExtractField<R> operationDelegate = new ExtractField.Value<R>(); 

public R apply(R record) {
        R operationRecord = operationDelegate.apply(record);
        String op = String.valueOf(operationRecord.value());
        Boolean isDeletedRecord = op.equalsIgnoreCase(Operation.DELETE.getValue())? true: false;
       ...
       finalRecord = afterDelegate.apply(record);
       if(isDeletedRecord){
            addDeletedFlag(finalRecord);
        }
} 
private void addDeletedFlag(R finalRecord){
        final SchemaBuilder builder = SchemaBuilder.struct();
        builder.name(finalRecord.valueSchema().name());
        for(Field f: finalRecord.valueSchema().fields()){
            builder.field(f.name(),f.schema());
        }
        builder.field(deleteFlagName,Schema.BOOLEAN_SCHEMA).optional();
        Schema newValueSchema = builder.build();
        try{
            java.lang.reflect.Field s = finalRecord.getClass().getSuperclass().getDeclaredField("valueSchema");
            s.setAccessible(true);
            s.set(finalRecord,newValueSchema);
        }catch (Exception e){
            e.printStackTrace();
        }

        Struct s = (Struct) finalRecord.value();
        updateValueSchema(s,finalRecord.valueSchema());
        updateValue(finalRecord.value(),true);
    }
private void updateValueSchema(Object o,Schema newSchema) {
        if(!(o instanceof Struct)){
            return;
        }
        Struct value = (Struct) o;
        try{
            java.lang.reflect.Field s = value.getClass().getDeclaredField("schema");
            s.setAccessible(true);
            s.set(value,newSchema);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    private void updateValue(Object o, Object newValue){
        if(!(o instanceof Struct)){
            return;
        }
        Struct value = (Struct) o;

        try{
            java.lang.reflect.Field s = value.getClass().getDeclaredField("values");
            s.setAccessible(true);
            Object[] newValueArray = ((Object[]) s.get(value)).clone();
            List<Object> newValueList = new ArrayList<>(Arrays.asList(newValueArray));
            newValueList.add(newValue);
            s.set(value, newValueList.toArray());
        }catch (Exception e){
            e.printStackTrace();
        }
    }

Upvotes: 0

Suren Aznauryan
Suren Aznauryan

Reputation: 1094

If it is not mandatory for you to receive the complex debezium message to kafka topic, check the Debezium's New Record State Extraction SMT. You'll need to configure it in Debezium's connector configuration and if you use it with delete.handling.mode:rewrite you will get a field __deletedin your messages which will serve the purpose of the field IsActive you have indicated in your question.

The simplified format of the messages you will receive to kafka will match the format of messages that jbdc sink connector expects, although you might just need to apply some of Single Message Transforms for Confluent Platform to jdbc sink connector's configuration in order to filter some fields, replace some fields, etc.

As a side benefit, you'll also get much less data to kafka.

Upvotes: 1

Related Questions