Reputation: 1415
I want to write a Kafka application that consumes from topics and save something in a database. The topics are created by Debezium Kafka connect based on mysql binlog. So I have one topic per table. This is the code I am using for consuming from one topic:
KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));
From architectural point of view I should create a KStream for each table and run them in parallel. But the number of tables is so big and having that amount of threads may not be the best option.
All the tables have a column called created_at (it is a laravel app) so I am curious if there is a way to have a generic Serde for values that extracts this common column. This is the only column I am interested in its value besides the name of the table.
Upvotes: 0
Views: 438
Reputation: 6613
It is all about how your value is serialized by the applicatino that produced messages (Connector).
If Deserializer
(Serdes
) can extract created_at
from different type of messages it is possible.
So, the Answer is yes, but it depends on your message value nad Deserializer
.
Assuming all your messages after serialization have format as follow:
In such case Deserializer
needs only to take characters till first ;
and cast it to date and the rest of value can be dropped.
Sample code:
public class CustomDeserializer implements Deserializer<Date> {
@Override
public Date deserialize(String topic, byte[] data) {
String strDate = new String(data);
return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
}
}
Upvotes: 1