Reputation: 1415
I want to filter my messages based on their ts_ms time. The problem is I cannot get ts_ms from avro messages. This is my trimmed down avro .avsc file:
{
"type": "record",
"name": "Envelope",
"namespace": "mysql.company.scores",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
<Some fields based on scores table>
],
"connect.name": "mysql.company.scores.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "version",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "connector",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "name",
"type": "string"
},
{
"name": "server_id",
"type": "long"
},
{
"name": "ts_sec",
"type": "long"
},
{
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "file",
"type": "string"
},
{
"name": "pos",
"type": "long"
},
{
"name": "row",
"type": "int"
},
{
"name": "snapshot",
"type": [
{
"type": "boolean",
"connect.default": false
},
"null"
],
"default": false
},
{
"name": "thread",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "db",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "query",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.debezium.connector.mysql.Source"
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
}
],
"connect.name": "mysql.company.scores.Envelope"
}
I can access before or after, but when I can the following method with getTs_ms, I get symbol cannot be find method:
private boolean isRecordNew(mysql.company.scores.Envelope value){
return value.getTs_ms() > 1580988600000L;
}
This is the relevant part of my serde class:
public static Serde<mysql.company.scores.Envelope> getEnvelopeSerde() {
SpecificAvroSerde<mysql.company.scores.Envelope> scoreSerde = new SpecificAvroSerde();
scoreSerde.configure(
Collections.singletonMap(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
schemaRegistryUrl), false);
return scoreSerde;
}
Shall I be able to access the ts_ms field with the same serde class or I should change it in order to have it included in the value?
Upvotes: 0
Views: 424
Reputation: 1415
As @cricket_007 mentioned in a comment I looked at the generated class and the field is named getTsMs()
and by using this method it was solved.
Upvotes: 1