Eriton Silva
Eriton Silva

Reputation: 131

How consume Kafka messages from a topic with multiple avro schemas using C# and Confluent.Kafka

I need consume messages from one topic that have multiple avro schemas.

I using a c# lib Confluent.SchemaRegistry and Confluent.Kafka for make my consumer.

I tried to use a GenericRecord Type to deserialize the message without pass the avro schema, but the serialization not working well because return a string with invalid json format.

public IConsumer<string, GenericRecord> Consumer =>
    new ConsumerBuilder<string, GenericRecord>(_consumerConfig)
        .SetValueDeserializer(new AvroDeserializer<GenericRecord>(
            new CachedSchemaRegistryClient(_schemaRegistryConfig)).AsSyncOverAsync())
        .Build();    

var consumer = _kafkaClienteConsumerFactory.Consumer; 
consumer.Subscribe(_configuration["Kafka:Topic"]);
result = consumer.Consume();
Mensagens.Add(result.Message.Value.ToString());

Upvotes: 0

Views: 2231

Answers (1)

Yevhen Cherkes
Yevhen Cherkes

Reputation: 861

Confluent.Kafka with Confluent.SchemaRegistry doesn't have this feature out of the box.

Some people use double serialization-deserialization approach (Raw record -> Generic Record -> Specific Record) as described in this article and in the conversation of this issue.

Additionally, you can use my Multi Schema Avro Deserializer GitHub Repository, NuGet Package. It's mentioned in the 3rd Party Libraries of Confluent's .NET Client repository.

Example:

IConsumer<string, ISpecificRecord> consumer =
    new ConsumerBuilder<string, ISpecificRecord>(_consumerConfig)
        .SetValueDeserializer(new MultiSchemaAvroDeserializer(
            new CachedSchemaRegistryClient(_schemaRegistryConfig)).AsSyncOverAsync())
        .Build();    

consumer.Subscribe(_configuration["Kafka:Topic"]);
var result = consumer.Consume();
List<ISpecificRecord> Mensagens = new List<ISpecificRecord>();
Mensagens.Add(result.Message.Value);

Also, you can look at kafka-flow library. It provides a similar multi-type Avro Serializer-deserializer and dispatcher out of the box.

Upvotes: 0

Related Questions