Reputation: 413
I have a spark-structured application connected to ActiveMQ. The application receives messages from a topic. These messages are in the form of a StringXML. I want to extract information from this nested-XML. How can I do this?
I referred to this post, but was not able to implement something similar in Scala.
XML Format:
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S </FilterClass>
<InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
<HeaderSegment xmlns="https://somelink.com">
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
.
.
.
My Code:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl", brokerUrl_)
.option("topic", topicName_)
.option("persistence", "memory")
.option("cleanSession", "true")
.option("username", username_)
.option("password", password_)
.load()
val payload_ = df.select('payload cast "string") // This payload IS the XMLString
Now I need to extract ExecutionTime
, Version
, and other fields from the above XML.
Upvotes: 1
Views: 327
Reputation: 18495
You can use the SQL built-in functions xpath
and the like to extract data from a nested XML structure.
Given a nested XML like (for simplicity, I have omitted any tag parameters)
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S</FilterClass>
<InputData>
<ns2>
<HeaderSegment>
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
</ns2>
</InputData>
</CofiResults>
you can then just use those SQL functions (without createOrReplaceTempView
) in your selectExpr
statment as below:
.selectExpr("CAST(payload AS STRING) as payload")
.selectExpr(
"xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
"xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
"xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
"xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")
Remember that the xpath
function will return an Array of Strings whereas you may find it more convenient to extract the value as String or even Long. Applying the code above in Spark 3.0.1 with a console sink stream will result in:
+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839] |20201103153839 |20201103153839 |6 |
+-------------------------+-------------------+---------------------+------------+
Upvotes: 1