VKR
VKR

Reputation: 685

how to Serialize .Net Class to Avro.Generic.GenericRecord for publishing into kafka topic?

I am trying to find a way/helper to convert.Net Class to Avro.Generic.GenericRecord . Currently, I am manually adding field-name and field-value to Generic record. Is there a serializer/converter which I can use to convert the object to generic record and publish on to a kafka topic.

class Plant
{
 public long Id { get; set; }
 public string Name { get; set; }
 public List<PlantProperties> PlantProperties{ get; set; }
}
class PlantProperties
{
 public long Leaves{ get; set; }
 public string Color{ get; set; }
}

Please suggest.

Upvotes: 2

Views: 8572

Answers (2)

VKR
VKR

Reputation: 685

Below are the steps I did to solve the problem using the suggestion from @cricket_007.

  1. To avoid the complexity of writing the avro schema, create the c# classes first then use AvroSerializer to generate schema.

AvroSerializer.Create().WriterSchema.ToString()

  1. This will generate the schema json for the class. Move it to a schema file and
  2. Make all the types to have nulls as Required
  3. Then used avro_gen.exe tool to regenerate class files which implements ISpecific Record.
  4. Add used the below code to publish to queue

    using (var serdeProvider = new AvroSerdeProvider(avroConfig))
            using (var producer = new Producer<string, MYClass>(producerConfig, 
      serdeProvider.GetSerializerGenerator<string>(), 
      serdeProvider.GetSerializerGenerator<MYClass>()))
            {
                Console.WriteLine($"{producer.Name} producing on 
           {_appSettings.PullListKafka.Topic}.");  
    
                producer.ProduceAsync(_appSettings.PullListKafka.Topic, new 
    Message<string, MYClass> { Key = Guid.NewGuid().ToString(), Value = MYClassObject})
                        .ContinueWith(task => task.IsFaulted
                            ? $"error producing message: {task.Exception.Message}"
                            : $"produced to: {task.Result.TopicPartitionOffset}");
    
            }
    

some links to help do this.

https://shanidgafur.github.io/blog/apache-avro-on-dotnet https://github.com/SidShetye/HelloAvro/tree/master/Avro

Upvotes: 1

OneCricketeer
OneCricketeer

Reputation: 191854

Assuming you are using the Confluent Schema Regsitry, you can use their .NET client1

https://github.com/confluentinc/confluent-kafka-dotnet

Copied from the examples folder

    using (var serdeProvider = new AvroSerdeProvider(avroConfig))
    using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<GenericRecord>()))
    {
        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 0;
        string text;
        while ((text = Console.ReadLine()) != "q")
        {
            var record = new GenericRecord(s);
            record.Add("name", text);
            record.Add("favorite_number", i++);
            record.Add("favorite_color", "blue");

            producer
                .ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                .ContinueWith(task => task.IsFaulted
                    ? $"error producing message: {task.Exception.Message}"
                    : $"produced to: {task.Result.TopicPartitionOffset}");
        }
    }

    cts.Cancel();
}

Where, in your case, update the record.Add uses accordingly


However, since you have a class, therefore, you should try to use SpecificRecord, rather than serializing back and forth between Avro and a .NET class via a GenericRecord. See the README section on the AvroGen tool for examples of this

1. I'm not aware of an alternative .NET library

Upvotes: 2

Related Questions