WinterSoldier
WinterSoldier

Reputation: 413

How to extract information from a nested XML_String in Spark-Structured-Streaming

I have a spark-structured application connected to ActiveMQ. The application receives messages from a topic. These messages are in the form of a StringXML. I want to extract information from this nested-XML. How can I do this?

I referred to this post, but was not able to implement something similar in Scala.

XML Format:

<CofiResults>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S </FilterClass>
  <InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
    <HeaderSegment xmlns="https://somelink.com">
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
    </HeaderSegment>
.
.
.

My Code:

val df = spark.readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl", brokerUrl_)
            .option("topic", topicName_)
            .option("persistence", "memory")
            .option("cleanSession", "true")
            .option("username", username_)
            .option("password", password_)
            .load()

val payload_ = df.select('payload cast "string") // This payload IS the XMLString

Now I need to extract ExecutionTime, Version, and other fields from the above XML.

Upvotes: 1

Views: 327

Answers (1)

Michael Heil
Michael Heil

Reputation: 18495

You can use the SQL built-in functions xpath and the like to extract data from a nested XML structure.

Given a nested XML like (for simplicity, I have omitted any tag parameters)

<CofiResults>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S</FilterClass>
  <InputData>
    <ns2>
      <HeaderSegment>
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
      </HeaderSegment>
    </ns2>
  </InputData>
</CofiResults>

you can then just use those SQL functions (without createOrReplaceTempView) in your selectExpr statment as below:

  .selectExpr("CAST(payload AS STRING) as payload")
  .selectExpr(
    "xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
    "xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
    "xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
    "xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")

Remember that the xpath function will return an Array of Strings whereas you may find it more convenient to extract the value as String or even Long. Applying the code above in Spark 3.0.1 with a console sink stream will result in:

+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839]         |20201103153839     |20201103153839       |6           |
+-------------------------+-------------------+---------------------+------------+

Upvotes: 1

Related Questions