QuickSilver
QuickSilver

Reputation: 4045

Merge multiple JSON file to single JSON and parquet file

Source S3 location with 100s of JSON

  1. All JSON files needs to be combined to single JSON file. i.e. non part-0000... files
  2. The output single JSON file need to replace all these file on source S3 location
  3. Same JSON file needs to converted Parquet and saved to other S3 location

Is there any best option apart from below,

  1. Load the JSON file into Dataframe
  2. Save it on local disk
  3. Upload the combined JSON file to S3
  4. Clean rest of the S3 files after the combined S3 file is uploaded successfully using AWS SDK Client API
  5. This run in parallel to 4. save the parquet file to parquet S3 location via dataframes API

I had below question on above design

Upvotes: 3

Views: 4171

Answers (2)

QuickSilver
QuickSilver

Reputation: 4045

spark.read
                  .json(sourcePath)
                  .coalesce(1)
                  .write
                  .mode(SaveMode.Overwrite)
                  .json(tempTarget1)

                val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)

                val deleted = fs
                  .delete(new Path(sourcePath + File.separator), true)
                logger.info(s"S3 folder path deleted=${deleted} sparkUuid=$sparkUuid path=${sourcePath}")

                val renamed = fs
                  .rename(new Path(tempTarget1),new Path(sourcePath))

Tried and failed,

  1. Dataframe caching/persist did not work as whenever I tried to write the cachedDf.write went back to check the S3 file which were manually cleaned by me before write.
  2. Writing Dataframe directly to same S3 directory does not work as Dataframe only overrides the file which are partitioned i.e. file starting with 'part-00...'.

Upvotes: 0

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29237

yes its possible to skip #2. Writing in to same location can be done with SaveMode.Overwrite the same location where you read from.

when you first read the json i.e. #1 as dataframe it will be in memory if you do cache. after that you can do a clean up and combile all json in to one with union and store in parquet file in a single step. something like this example.
Case 1: all jsons are in different folders and you want them to store final dataframe as parquet in the same location where jsons are there...

val dfpath1 = spark.read.json("path1")
val dfpath2 =  spark.read.json("path2")
val dfpath3 =  spark.read.json("path3")

val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe

val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..

 
  finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")

Case 2: all jsons are in same folders and you want them to store final dataframe as multiple parquet in the same root location where jsons are there...

In this case no need for reading as multiple dataframes you can give root path where jsons are there with same schema

val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")

// or you can give multiple paths spark.read.json("path1","path2","path3")
 // since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns  dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")

AFAIK, In either case aws s3 sdk api is no more required.

UPDATE : Reg. File Not Found Exception you are facing... see below code example of how to do it. I quoted the same example you showed me here

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("quick silver saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

Result :

+---+----+
|sex|date|
+---+----+
|1  |10  |
|2  |20  |
|3  |30  |
+---+----+

+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows

quick silver saved in same directory where he read it from final records he saved after clean up are  
+---+----+----------------------------+
|sex|date|cleanup                     |
+---+----+----------------------------+
|1  |10  |Quick silver want to cleanup|
|2  |20  |Quick silver want to cleanup|
|3  |30  |Quick silver want to cleanup|
+---+----+----------------------------+


Screen shot of File saved and readback cleaned up and again saved :

enter image description here

Note : You need to implement the case 1 or case 2 like suggested update above...

Upvotes: 1

Related Questions