Kiran Gali
Kiran Gali

Reputation: 111

Avro bytes from Event hub cannot be deserialized with pyspark

We are sending Avro data encoded with (azure.schemaregistry.encoder.avroencoder) to Event-Hub using a standalone python job and we can deserialize using the same decoder using another standalone python consumer. The schema registry is also supplied to the Avro encoder in this case.

This is the stand alone producer I use

import os
from azure.eventhub import EventHubProducerClient, EventData
from azure.schemaregistry import SchemaRegistryClient
from azure.schemaregistry.encoder.avroencoder import AvroEncoder
from azure.identity import DefaultAzureCredential


os.environ["AZURE_CLIENT_ID"] = ''
os.environ["AZURE_TENANT_ID"] = ''
os.environ["AZURE_CLIENT_SECRET"] = ''
token_credential = DefaultAzureCredential()

fully_qualified_namespace = ""
group_name = "testSchemaReg"
eventhub_connection_str = ""
eventhub_name = ""

definition = """
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}"""

schema_registry_client = SchemaRegistryClient(fully_qualified_namespace, token_credential)
avro_encoder = AvroEncoder(client=schema_registry_client, group_name=group_name, auto_register=True)

eventhub_producer = EventHubProducerClient.from_connection_string(
    conn_str=eventhub_connection_str,
    eventhub_name=eventhub_name
)

with eventhub_producer, avro_encoder:
    event_data_batch = eventhub_producer.create_batch()
    dict_content = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"}
    event_data = avro_encoder.encode(dict_content, schema=definition, message_type=EventData)
    event_data_batch.add(event_data)
    eventhub_producer.send_batch(event_data_batch)

I was able to deserialise using the stand alone consumer

async def on_event(partition_context, event):
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'),
                                                                                 partition_context.partition_id))
    print("message type is :")
    print(type(event))
    dec = avro_encoder.decode(event)
    print("decoded msg:\n")
    print(dec)
    await partition_context.update_checkpoint(event)


async def main():
    client = EventHubConsumerClient.from_connection_string(
        "connection str"
        "topic name",
        consumer_group="$Default",
        eventhub_name="")
    async with client:
        await client.receive(on_event=on_event, starting_position="-1")

As a next step , I replaced the standalone python consumer with the py-spark consumer running on synapse notebook. Below are the problems I faced

  1. The from_avro function in spark is not able to deserialize the Avro message encoded with azure encoder.
  2. As a work a round, I tied creating an UDF which makes use of azure encoder , but I see that azure encoder is expecting the event to be of type EventData, but when spark reads the data using event hub API, we get the data in Byte Array.
@udf
def decode(row_msg):
    encoder = AvroEncoder(client=schema_registry_client)
    encoder.decode(bytes(row_msg))
  1. I don't see any proper documentation on the deserializer that we can use with spark or any distributed system. All examples are with Stand Alone clients. Do we have any connector that we can use with spark/Flink ?

Upvotes: 1

Views: 832

Answers (2)

Kiran Gali
Kiran Gali

Reputation: 111

Answering my own question, azure event hub schema registry doesn't support spark or any distributed system.

They are working on it and trying to add this support to spark https://github.com/Azure/azure-event-hubs-spark/pull/573

Upvotes: 1

swathipil-msft
swathipil-msft

Reputation: 101

  1. Because the avro schema is not part of the payload ("the data"), the from_avro function in spark will not be able to deserialize the message. This should be expected.

  2. In order to decode, you also need to pass the associated content_type value on the EventData object into the decode​ method. This content_type value holds the schema ID that will be used to retrieve the schema used for deserialization. You can set content_type along with content in the MessageContent dict. This sample should be helpful.

  3. We currently don't have a connector to be used with spark/flint. However, if this is something you're interested in, please feel free to file a feature-request issue here: https://github.com/Azure/azure-sdk-for-python/issues.

Upvotes: 0

Related Questions