Reputation: 55
I am sending Avro-serialized messages from Kafka to Azure Event Hubs using SchemaRegistryApacheAvroSerializer
. The schema is stored in Azure Schema Registry, and the producer sends events successfully.
public EventHubProducerService(
@Value("${EVENTHUB_CONNECTION_STRING}") String connectionString,
@Value("${eventhub.name}") String eventHubName,
@Value("${schema-registry.endpoint}") String schemaRegistryEndpoint,
@Value("${schema-registry.group}") String schemaRegistryGroup) {
tokenCredential = new DefaultAzureCredentialBuilder().build();
this.producerClient = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
SchemaRegistryAsyncClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.credential(tokenCredential)
.fullyQualifiedNamespace(schemaRegistryEndpoint)
.buildAsyncClient();
this.schemaRegistryApacheAvroSerializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryClient(schemaRegistryClient)
.schemaGroup(schemaRegistryGroup)
.autoRegisterSchemas(true)
.avroSpecificReader(true)
.buildSerializer();
}
public void sendMessage(AgreementLifecycleDomainSourceType message) {
EventData eventData = schemaRegistryApacheAvroSerializer.serialize(
message, TypeReference.createInstance(EventData.class)
);
SendOptions sendOptions = new SendOptions().setPartitionId("1");
producerClient.send(Collections.singletonList(eventData), sendOptions);
}
When I connect this Event Hub to Microsoft Fabric Eventstream and try to preview the data, I get this error:
Source 'EventHubInputAdapter' had occurrences of kind 'InputDeserializerError.InvalidData'.
Invalid Avro Format.
Any insights would be helpful!
Upvotes: 0
Views: 35
Reputation: 3639
The Invalid Avro Format
error that occurs when trying to deserialize Avro messages in Microsoft Fabric Eventstream from Azure Event Hubs can be resolved by using the following code:
PlayingCard playingCard = new PlayingCard();
playingCard.setPlayingCardSuit(PlayingCardSuit.SPADES);
playingCard.setIsFaceCard(false);
playingCard.setCardValue(5);
MessageContent message = serializer.serialize(playingCard,
TypeReference.createInstance(MessageContent.class));
In this case, change .avroSpecificReader(true)
to .avroSpecificReader(false)
.
Below is a sample code snippet for the Azure Schema Registry Apache Avro Serializer client library for Java with Azure Event Hubs. Refer to this GitHub repository for sample code on serialization and deserialization:
private final EventHubProducerClient producerClient;
private final SchemaRegistryApacheAvroSerializer schemaRegistryApacheAvroSerializer;
private final TokenCredential tokenCredential;
public EventHubProducerService(
String connectionString,
String eventHubName,
String schemaRegistryEndpoint,
String schemaRegistryGroup) {
tokenCredential = new DefaultAzureCredentialBuilder().build();
this.producerClient = new EventHubClientBuilder()
.connectionString(connectionString, eventHubName)
.buildProducerClient();
SchemaRegistryAsyncClient schemaRegistryClient = new SchemaRegistryClientBuilder()
.credential(tokenCredential)
.fullyQualifiedNamespace(schemaRegistryEndpoint)
.buildAsyncClient();
this.schemaRegistryApacheAvroSerializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryClient(schemaRegistryClient)
.schemaGroup(schemaRegistryGroup)
.autoRegisterSchemas(true)
.avroSpecificReader(false)
.buildSerializer();
}
public void sendMessage(GenericRecord message) {
try {
EventData eventData = schemaRegistryApacheAvroSerializer.serialize(
message, TypeReference.createInstance(EventData.class)
);
System.out.println("Serialized Message: " + new String(eventData.getBody()));
SendOptions sendOptions = new SendOptions().setPartitionId("1");
producerClient.send(Collections.singletonList(eventData), sendOptions);
System.out.println("Message successfully sent to Event Hub.");
} catch (Exception e) {
System.err.println("Error sending Avro message: " + e.getMessage());
e.printStackTrace();
}
}
Upvotes: 0