Reputation: 3
I am trying to integrate with salesforces new grpc change data capture event bus. Events are sent to clients via grpc with an avro encoded message of what the changes to the records were, so the client has to decode the message using an avro schema that is provided and unable to be changed.
I am able to easily decode the avro encoded message for objects with fields of two union types but fields with three types throw exceptions.
This is the Name field for the Account avro schema:
{
"name": "Name",
"type": [
"null",
"string",
{
"type": "record",
"name": "Switchable_PersonName",
"fields": [
{
"name": "Salutation",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "FirstName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "LastName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "MiddleName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "InformalName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "Suffix",
"type": [
"null",
"string"
],
"default": null
}
]
}
],
"doc": "Data:Switchable_PersonName",
"default": null
},
As you can see the name can either be null, a string, or an object called Switchable_PersonName.
Using the avrogen cli tool, I was able to convert the avro schema into concrete c# classes of AccountChangeEvent, ChangeEventHeader, ChangeType, Address, Switchable_PersonName. The Name field was created in the AccountChangeEvent class as:
private object _Name;
This is the method I made for decoding the avro message:
public static void DeserializeAccountConcrete(byte[] payload)
{
var accSchema = Avro.Schema.Parse(File.ReadAllText("./avro/AccountGRPCSchema.avsc"));
var unionSchema = accSchema as Avro.UnionSchema;
var cache = new ClassCache();
cache.LoadClassCache(typeof(AccountChangeEvent), unionSchema);
cache.LoadClassCache(typeof(Switchable_PersonName), unionSchema);
cache.LoadClassCache(typeof(Address), unionSchema);
var reader = new ReflectReader<AccountChangeEvent>(accSchema, accSchema, cache);
using var accStream = new MemoryStream(payload);
accStream.Seek(0, SeekOrigin.Begin);
var accDecoder = new BinaryDecoder(accStream);
var accEvent = reader.Read(accDecoder);
Console.WriteLine(accEvent.Name);
Console.WriteLine("Event " + accEvent.ChangeEventHeader.changeType);
}
This sort of deserialization works for other schemas but it fails for the Account schema with this exception being thrown.
Avro.AvroException: Class for union record type com.sforce.eventbus.Switchable_PersonName is not registered.Create a ClassCache object and call LoadClassCache
Looking at the documentation for avro my implementation looks correct but it seems it is not.
I have changed the field type to
private com.sforce.eventbus.Switchable_PersonName _Name;
and any other code that may rely on this field but the same error is still thrown. I am new to avro so there may be many things that I do not know or am doing wrong.
Upvotes: 0
Views: 984
Reputation: 3
I believe I found a solution. Before, I was loading only the account avro schema and casting it as the union schema which is not what I needed. I found that I needed to load in all the related schemas for every record type and load it into the cache like this:
var accSchema = Avro.Schema.Parse(File.ReadAllText("./avro/AccountGRPCSchema.avsc"));
var changeEventHeaderSchema = Avro.Schema.Parse(File.ReadAllText("./avro/ChangeEventHeaderSchema.avsc"));
var nameSchema = Avro.Schema.Parse(File.ReadAllText("./avro/SwitchablePersonNameSchema.avsc"));
var addressSchema = Avro.Schema.Parse(File.ReadAllText("./avro/AddressSchema.avsc"));
var cache = new ClassCache();
cache.LoadClassCache(typeof(ChangeEventHeader), changeEventHeaderSchema);
cache.LoadClassCache(typeof(Switchable_PersonName), nameSchema);
cache.LoadClassCache(typeof(Address), addressSchema);
After that I could deserialize the data into proper objects and read them using a ReflectReader.
var reader = new ReflectReader<AccountChangeEvent>(accSchema, accSchema, cache);
using var accStream = new MemoryStream(payload);
accStream.Seek(0, SeekOrigin.Begin);
var accDecoder = new BinaryDecoder(accStream);
var accEvent = reader.Read(accDecoder);
if(accEvent.Name is PersonName && accEvent.Name is not null)
{
var personName = accEvent.Name as Switchable_PersonName;
Console.WriteLine(personName.FirstName + " " + personName.LastName);
} else
{
Console.WriteLine(accEvent.Name?.ToString());
}
Upvotes: 0