Hessam
Hessam

Reputation: 1415

Consuming from multiple Kafka topics

I want to write a Kafka application that consumes from topics and save something in a database. The topics are created by Debezium Kafka connect based on mysql binlog. So I have one topic per table. This is the code I am using for consuming from one topic:

KStream<GenericRecord,mysql.company.tiers.Envelope>[] tierStream = builder.stream("mysql.alopeyk.tiers",
                Consumed.with(TierSerde.getGenericKeySerde(), TierSerde.getEnvelopeSerde()));

From architectural point of view I should create a KStream for each table and run them in parallel. But the number of tables is so big and having that amount of threads may not be the best option.

All the tables have a column called created_at (it is a laravel app) so I am curious if there is a way to have a generic Serde for values that extracts this common column. This is the only column I am interested in its value besides the name of the table.

Upvotes: 0

Views: 438

Answers (1)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6613

It is all about how your value is serialized by the applicatino that produced messages (Connector). If Deserializer (Serdes) can extract created_at from different type of messages it is possible.

So, the Answer is yes, but it depends on your message value nad Deserializer.

Assuming all your messages after serialization have format as follow:

  • create_at;name:position;...
  • create_at;city,country;...
  • create_at;product_name;...

In such case Deserializer needs only to take characters till first ; and cast it to date and the rest of value can be dropped.

Sample code:

public class CustomDeserializer implements Deserializer<Date> {

    @Override
    public Date deserialize(String topic, byte[] data) {
        String strDate = new String(data);
        return new Date(Long.parseLong(strDate.substring(0, strDate.indexOf(";"))));
    }
}

Upvotes: 1

Related Questions