Reputation: 69
I have a requirement to process xml files streamed into a S3 folder. Currently, I have implemented it as follows.
First, Read files using Spark's fileStream
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
For each RDD, check if any file has been read
if (data.count() !=0)
Write the string to a new HDFS directory
data.coalesce(1).saveAsTextFile(sdir);
Create a Dataframe reading from the above HDFS directory
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
Do some processing on Dataframe and save as JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")
Somehow, I feel that the above approach is very inefficient and frankly quite school boyish. Is there a better solution? Any help would be greatly appreciated.
A follow up question: How to manipulate fields (not Columns) in a dataframe? I have a vey complex nested xml and when I use the above described method, I am getting a Dataframe with 9 columns and 50 odd inner Struct arrays. That is fine except for the need to trim certain field names. Is there a way to achieve that without exploding the dataframe, as I need to construct the same structure again?
Upvotes: 3
Views: 2496
Reputation:
If you use Spark 2.0 you may be able to make it work with structured streaming:
val inputDF = spark.readStream.format("com.databricks.spark.xml")
.option("rowTag", "Trans")
.load(path)
Upvotes: 4