Reputation: 165
I'm trying to send xml files (less than 100 kb) to an Azure Event Hub and then, after sending them, read the events in Databricks.
Now I have used the Python SDK to send the content of the XML in bytes (this step WORKS). But the next step I would like to achive is reading that XML content from the "body" of the event and create a Spark Dataframe using PYSPARK.
To be able to do this, I have two doubts:
1- Is there any option where I specify in the
spark.readStream
option that the content of the "body" of the event is an XML?
2- Is there any alternative to dump that content directly to a Spark Dataframe?
3- I'm missing some configuration when sending the XML as events?
I was trying like the example below:
Python event producer
# this is the python event hub message producer
import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData
import xml.etree.ElementTree as ET
from lxml import etree
from pathlib import Path
connection_str= "Endpoint_str"
eventhub_name = "eventhub_name"
xml_path = Path("path/to/xmlfile.xml")
xml_data = ET.parse(xml_path)
tree = xml_data.getroot()
data = ET.tostring(tree)
async def run():
# Create a producer client to send messages to the event hub.
# Specify a connection string to your event hubs namespace and
# the event hub name.
producer = EventHubProducerClient.from_connection_string(conn_str=connection_str, eventhub_name=eventhub_name)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData(data))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Event reader
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
Thanks!!!
Upvotes: 1
Views: 1599
Reputation: 165
So I finally came with the next approach to read XML from Event Hub body.
First I use the import xml.etree.ElementTree as ET
library to parse the XML structure.
stream_data = spark \
.readStream \
.format('eventhubs') \
.options(**event_hub_conf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load() \
.select("body")
df = stream_data.withColumn("body", stream_data["body"].cast("string"))
import xml.etree.ElementTree as ET
import json
def returnV(col):
elem_dict= {}
tag_list = [
'./TAG/Document/id',
'./TAG/Document/car',
'./TAG/Document/motor',
'./Metadata/Date']
root = ET.fromstring(col)
for tag in tag_list:
for item in root.findall(tag):
elem_dict[item.tag] = item.text
return json.dumps(elem_dict)
I had some nested TAGs and with this method I'm extracting all the needed values and returning them as JSON. What I have learned is that Structured Streaming is not the solution if the incoming schema can change. So I took only those values that I know that they are not going to change during time.
Then, once defined the method, I regist it as UDF.
extractValuesFromXML = udf(returnV)
XML_DF= df.withColumn("body",extractValuesFromXML("body"))
Then finally I just use get_json_object
function to extract the values of the JSON
input_parsed_df = XML_DF.select(
get_json_object("body", "$.id").alias("id").cast('integer'),
get_json_object("body", "$.car").alias("car"),
get_json_object("body", "$.motor").alias("motor"),
get_json_object("body", "$.Date").alias("Date")
)
Upvotes: 1