Reputation: 3118
Im using Debezium to sync data from MySQL to S3. Now I want to make some changes.
Sample insert:
create table new (id int);
insert into new (1);
{
"schema": {
"type": "struct",
bla bla bla
"optional": false,
"name": "_72.31.84.129.test.new.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 10
},
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
}
I want to only push the payload option with some custom changes. I need to include the source,op,ts_ms
are inside the payload.after
.
{
"id": 10,
"source": {
"version": "0.10.0.Final",
"connector": "mysql",
"name": "11.11.84.129",
"ts_ms": 1576605998000,
"snapshot": "false",
"db": "test",
"table": "new",
"server_id": 1,
"gtid": "3a7b90e9-207e-11ea-b3ed-121a0cbac3cb:51",
"file": "mysql-bin.000003",
"pos": 12770,
"row": 0,
"thread": 47,
"query": null
},
"op": "c",
"ts_ms": 1576605998231
}
I don't want schema, payload.before. Im not sure how to get this output.
Upvotes: 1
Views: 1712
Reputation: 19020
Check out the SMT for extracting the new record state. It will only propagate what's in after
. Optionally, you can let it add chosen field from source
, too.
...
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.source.fields=table,lsn
...
You cannot insert the op
and ts_ms
fields atm., but they can be propgated as message headers.
Upvotes: 3