Reputation: 73
I have to process millions of JSON files from Azure Blob Storage, each representing one row, and need to load them into Azure SQL DB with some minimal transformation in between. These files come in at random times but follow the same schema.
My first solution basically just created a DataFrame for each file and pushed it into SQL. This worked when we were receiving hundreds of files but now that we are received millions of files it is not scaling, taking over a day to process.
We also tried processing the files in Scala without Spark (see code below) but this is also too slow; 500 files processed in 8 minutes.
var sql_statement = ""
allFiles.par.map(file_name => {
//processing
val json = scala.io.Source.fromFile(file_name).mkString
val mapData1 = mapper.readValue(json, classOf[Map[String, Any]])
val account= mapData1("Contact").asInstanceOf[Map[String, Any]]
val common = account.keys.toList.intersect(srcDestMap .keys.toList)
val trMap=common.map(rec=>Map(srcDestMap(rec)->account(rec))).flatten.toMap
val vals=trMap.keys.toList.sorted.map(trMap(_).toString.replace("'", "''")).map("'"+_+"'")
//end processing
val cols="insert into dbo.Contact_VS(" + trMap.keys.toList.sorted.mkString(",") + ")" + " values (" + vals.mkString(",") + ")"
sql_statement = sql_statement + cols
})
val updated=statement.executeUpdate(sql_statement)
connection.close()
If anyone knows how to optimize this code, or any out-of-the-box thinking we could use to preprocess our JSON it would be greatly appreciated! The JSON is nested so it's a little more involved to merge everything into one large JSON to be read into Spark but we may have to go that way if no one has any better ideas.
Upvotes: 1
Views: 1151
Reputation: 4017
Since you already got an answer let me point some problems with the raw scala implementation:
1) creating sql requests manually is error-prone and inefficient
2) updating sql_statement
in a loop is very inefficient
3) level of parallelism of allFiles.par
. .par
shouldn't be used for blocking tasks for two reasons:
it uses global shared thread pool under the hood so one batch of tasks will block other tasks.
parallelism level is optimized for cpu-bound tasks (number of CPU threads). You want much higher parallelism.
Upvotes: 0
Reputation: 933
You are close spark contains some helper functions to parallelize tasks across the cluster. Note you will want to set "spark.default.parallelism" to a sane number such that you're not creating too many connections to your DB.
def loadFileAndUploadToRDS(filepath: String): Unit = ???
@Test
def parallelUpload(): Unit ={
val files = List("s3://bucket/path" /** more files **/)
spark.sparkContext.parallelize(files).foreach(filepath => loadFileAndUploadToRDS(filepath))
}
Upvotes: 1