Kamboh
Kamboh

Reputation: 185

extract and transform kafka message specific fields for jdbc sink connector

I have a kafka topic which is getting data from mysql database using Debezium mysql source connector, following is the format of one of the messages:

{
    "Message": {
        "schema": {
            "type": "struct",
            "fields": [
              ...
            ],
            "optional": true,
            "name": "mysql-server-1.inventory.somename"
        },
        "payload": {
            "op": "u",
            "ts_ms": 1465491411815,
            "before": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Doof",
                "email": "[email protected]"
            },
            "after": {
                "id": 1004,
                "first_name": "Anne",
                "last_name": "Marry",
                "email": "[email protected]"
            },
            "source": {
                "db": "inventory",
                "table": "customers",
                ...
                "query": "Update customers set last_name = 'Marry' where id = 1004"
            }
        }
    }
}

I want to push ts_ms, before, after and id (from the object/row) columns into another database using jdbc sink connector with table schema as (id,before(text),after(text),timestamp), being new to kafka cant figure out:

For the message above, the sink destination table should have data like this below at the end:

id:     1004
before: '{"id":1004,"first_name":"Anne","last_name":"Doof","email":"[email protected]"}'
after:  '{"id":1004,"first_name":"Anne","last_name":"Marry","email":"[email protected]"}'
timestamp: 1465491411815

Upvotes: 2

Views: 5172

Answers (2)

TechGeek
TechGeek

Reputation: 508

You can create a DTO (Java object for your json payload which you are getting from your kafka topic) make use of this online converters helps you to convert your json to Java objects. [http://pojo.sodhanalibrary.com/][1]

Once you receive your message from your kafka topic, you can use objectmapper to convert that json and map it into your appropriate DTO objects.Once you have the object ready. You can make use of that object to extract the fields you want by just calling getId(),getBefore() etc..,

Here is some reference code which helps you understand:

    @KafkaListener(topics = "test")
        public void listen(String payload)  {

            logger.info("Message Received from Kafka topic: {}", payload);

            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

            DTOObject dtoObject = objectMapper.readValue(payload,DTOObject.class);

                logger.info("After Convertion: {}", objectMapper.writeValueAsString(dtoObject));

                logger.info("Get Before:{}", dtoObject.getId());



        }

Upvotes: 0

Iskuskov Alexander
Iskuskov Alexander

Reputation: 4375

You can use chain of Kafka Connect Transformations, like this solution.

Upvotes: 0

Related Questions