Reputation: 111
We are sending Avro data encoded with (azure.schemaregistry.encoder.avroencoder) to Event-Hub using a standalone python job and we can deserialize using the same decoder using another standalone python consumer. The schema registry is also supplied to the Avro encoder in this case.
This is the stand alone producer I use
import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential
os.environ["AZURE_CLIENT_ID"] = ''
os.environ["AZURE_TENANT_ID"] = ''
os.environ["AZURE_CLIENT_SECRET"] = ''
token_credential = DefaultAzureCredential()
fully_qualified_namespace = ""
group_name = "testSchemaReg"
eventhub_connection_str = ""
eventhub_name = ""
definition = """
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}"""
schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)
eventhub_producer = EventHubProducerClient.from_connection_string(
conn_str=eventhub_connection_str,
eventhub_name=eventhub_name
)
with eventhub_producer, avro_encoder:
event_data_batch = eventhub_producer.create_batch()
dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
event_data_batch.add(event_data)
eventhub_producer.send_batch(event_data_batch)
I was able to deserialise using the stand alone consumer
async def on_event(partition_context, event):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'),
partition_context.partition_id))
print("message type is :")
print(type(event))
dec = avro_encoder.decode(event)
print("decoded msg:\n")
print(dec)
await partition_context.update_checkpoint(event)
async def main():
client = EventHubConsumerClient.from_connection_string(
"connection str"
"topic name",
consumer_group="$Default",
eventhub_name="")
async with client:
await client.receive(on_event=on_event, starting_position="-1")
As a next step , I replaced the standalone python consumer with the py-spark consumer running on synapse notebook. Below are the problems I faced
@udf
def decode(row_msg):
encoder = AvroEncoder(client=schema_registry_client)
encoder.decode(bytes(row_msg))
Upvotes: 1
Views: 832
Reputation: 111
Answering my own question, azure event hub schema registry doesn't support spark or any distributed system.
They are working on it and trying to add this support to spark https://github.com/Azure/azure-event-hubs-spark/pull/573
Upvotes: 1
Reputation: 101
Because the avro schema is not part of the payload ("the data"), the from_avro
function in spark will not be able to deserialize the message. This should be expected.
In order to decode, you also need to pass the associated content_type
value on the EventData object into the decode method. This content_type
value holds the schema ID that will be used to retrieve the schema used for deserialization. You can set content_type
along with content
in the MessageContent
dict. This sample should be helpful.
We currently don't have a connector to be used with spark/flint. However, if this is something you're interested in, please feel free to file a feature-request issue here: https://github.com/Azure/azure-sdk-for-python/issues.
Upvotes: 0