Vamsi
Vamsi

Reputation: 69

Spark Streaming xml files

I have a requirement to process xml files streamed into a S3 folder. Currently, I have implemented it as follows.

First, Read files using Spark's fileStream

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

For each RDD, check if any file has been read

if (data.count() !=0)

Write the string to a new HDFS directory

data.coalesce(1).saveAsTextFile(sdir);

Create a Dataframe reading from the above HDFS directory

val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)

Do some processing on Dataframe and save as JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder")

Somehow, I feel that the above approach is very inefficient and frankly quite school boyish. Is there a better solution? Any help would be greatly appreciated.

A follow up question: How to manipulate fields (not Columns) in a dataframe? I have a vey complex nested xml and when I use the above described method, I am getting a Dataframe with 9 columns and 50 odd inner Struct arrays. That is fine except for the need to trim certain field names. Is there a way to achieve that without exploding the dataframe, as I need to construct the same structure again?

Upvotes: 3

Views: 2496

Answers (1)

user6022341
user6022341

Reputation:

If you use Spark 2.0 you may be able to make it work with structured streaming:

val inputDF = spark.readStream.format("com.databricks.spark.xml")
  .option("rowTag", "Trans")
  .load(path)

Upvotes: 4

Related Questions