morgan
morgan

Reputation: 73

How to process millions of small JSON files quickly using Scala Spark?

I have to process millions of JSON files from Azure Blob Storage, each representing one row, and need to load them into Azure SQL DB with some minimal transformation in between. These files come in at random times but follow the same schema.

My first solution basically just created a DataFrame for each file and pushed it into SQL. This worked when we were receiving hundreds of files but now that we are received millions of files it is not scaling, taking over a day to process.

We also tried processing the files in Scala without Spark (see code below) but this is also too slow; 500 files processed in 8 minutes.

var sql_statement = ""
allFiles.par.map(file_name => {
      //processing
      val json = scala.io.Source.fromFile(file_name).mkString
      val mapData1 = mapper.readValue(json, classOf[Map[String, Any]])
      val account=  mapData1("Contact").asInstanceOf[Map[String, Any]]
      val common = account.keys.toList.intersect(srcDestMap .keys.toList)
      val trMap=common.map(rec=>Map(srcDestMap(rec)->account(rec))).flatten.toMap
      val vals=trMap.keys.toList.sorted.map(trMap(_).toString.replace("'", "''")).map("'"+_+"'")
      //end processing

      val cols="insert into dbo.Contact_VS(" + trMap.keys.toList.sorted.mkString(",") + ")" + " values (" + vals.mkString(",") + ")"
      sql_statement = sql_statement + cols
    })
      val updated=statement.executeUpdate(sql_statement)
      connection.close()

If anyone knows how to optimize this code, or any out-of-the-box thinking we could use to preprocess our JSON it would be greatly appreciated! The JSON is nested so it's a little more involved to merge everything into one large JSON to be read into Spark but we may have to go that way if no one has any better ideas.

Upvotes: 1

Views: 1151

Answers (2)

simpadjo
simpadjo

Reputation: 4017

Since you already got an answer let me point some problems with the raw scala implementation:

1) creating sql requests manually is error-prone and inefficient

2) updating sql_statement in a loop is very inefficient

3) level of parallelism of allFiles.par. .par shouldn't be used for blocking tasks for two reasons:

  • it uses global shared thread pool under the hood so one batch of tasks will block other tasks.

  • parallelism level is optimized for cpu-bound tasks (number of CPU threads). You want much higher parallelism.

Upvotes: 0

Andrew Long
Andrew Long

Reputation: 933

You are close spark contains some helper functions to parallelize tasks across the cluster. Note you will want to set "spark.default.parallelism" to a sane number such that you're not creating too many connections to your DB.

  def loadFileAndUploadToRDS(filepath: String): Unit = ???

  @Test
  def parallelUpload(): Unit ={
    val files = List("s3://bucket/path" /** more files **/)
    spark.sparkContext.parallelize(files).foreach(filepath => loadFileAndUploadToRDS(filepath))
  }

Upvotes: 1

Related Questions