TheDataGuy
TheDataGuy

Reputation: 3118

Debezium - Custom Payload - MySQL Connector

Im using Debezium to sync data from MySQL to S3. Now I want to make some changes.

Sample insert:

create table new (id int);
insert into new (1);

1. Custom Payload

{
    "schema": {
        "type": "struct",
        bla bla bla
        "optional": false,
        "name": "_72.31.84.129.test.new.Envelope"
    },
    "payload": {
        "before": null,
        "after": {
            "id": 10
        },
        "source": {
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "11.11.84.129",
            "ts_ms": 1576605998000,
            "snapshot": "false",
            "db": "test",
            "table": "new",
            "server_id": 1,
            "gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
            "file": "mysql-bin.000003",
            "pos": 12770,
            "row": 0,
            "thread": 47,
            "query": null
        },
        "op": "c",
        "ts_ms": 1576605998231
    }
}

I want to only push the payload option with some custom changes. I need to include the source,op,ts_ms are inside the payload.after.

Expected output:

{
            "id": 10, 
            "source": {
            "version": "0.10.0.Final",
            "connector": "mysql",
            "name": "11.11.84.129",
            "ts_ms": 1576605998000,
            "snapshot": "false",
            "db": "test",
            "table": "new",
            "server_id": 1,
            "gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
            "file": "mysql-bin.000003",
            "pos": 12770,
            "row": 0,
            "thread": 47,
            "query": null
        },
        "op": "c",
        "ts_ms": 1576605998231
        }

I don't want schema, payload.before. Im not sure how to get this output.

Upvotes: 1

Views: 1712

Answers (1)

Gunnar
Gunnar

Reputation: 19020

Check out the SMT for extracting the new record state. It will only propagate what's in after. Optionally, you can let it add chosen field from source, too.

...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...

You cannot insert the op and ts_ms fields atm., but they can be propgated as message headers.

Upvotes: 3

Related Questions