senthil kumar p
senthil kumar p

Reputation: 546

How to parse the streaming XML into dataframe?

I'm consuming the XML file from kafka topic .Can anyone tell me how to parse the XML into dataframe.

val df = sqlContext.read
    .format("com.databricks.spark.xml")
    //.option("rowTag","ns:header")
   // .options(Map("rowTag"->"ntfyTrns:payloadHeader","rowTag"->"ns:header"))
       .option("rowTag","ntfyTrnsDt:notifyTransactionDetailsReq")
    .load("/home/ubuntu/SourceXML.xml")
    df.show
    df.printSchema()
    df.select(col("ns:header.ns:captureSystem")).show()

I able to exact the information information from XML .I dont know how to pass or convert or load the RDD[String] from kafka topic to sql read API.

Thanks!

Upvotes: 0

Views: 1865

Answers (1)

esteban ceron
esteban ceron

Reputation: 11

I am facing the same situation, doing some research I found that some people is using this method to convert the RDD to a DataFrame using the following code as shown here:

val wrapped = rdd.map(xml => s"""<a>$xml</a>""")
val df = new XmlReader().xmlRdd(sqlContext, wrapped)

You just have to obtain the RDD from the DStream, I am doing this using pyspark

streamElement = ssc.textFileStream("s3n://your_path")
streamElement.foreachRDD(process)

where process method has the following structure, so you can do everything with your rdds

def process(time, rdd):
  return value 

Upvotes: 1

Related Questions