Reputation: 546
I'm consuming the XML file from kafka topic .Can anyone tell me how to parse the XML into dataframe.
val df = sqlContext.read
.format("com.databricks.spark.xml")
//.option("rowTag","ns:header")
// .options(Map("rowTag"->"ntfyTrns:payloadHeader","rowTag"->"ns:header"))
.option("rowTag","ntfyTrnsDt:notifyTransactionDetailsReq")
.load("/home/ubuntu/SourceXML.xml")
df.show
df.printSchema()
df.select(col("ns:header.ns:captureSystem")).show()
I able to exact the information information from XML .I dont know how to pass or convert or load the RDD[String] from kafka topic to sql read API.
Thanks!
Upvotes: 0
Views: 1865
Reputation: 11
I am facing the same situation, doing some research I found that some people is using this method to convert the RDD to a DataFrame using the following code as shown here:
val wrapped = rdd.map(xml => s"""<a>$xml</a>""")
val df = new XmlReader().xmlRdd(sqlContext, wrapped)
You just have to obtain the RDD from the DStream, I am doing this using pyspark
streamElement = ssc.textFileStream("s3n://your_path")
streamElement.foreachRDD(process)
where process method has the following structure, so you can do everything with your rdds
def process(time, rdd):
return value
Upvotes: 1