Reputation: 4045
Source S3 location with 100s of JSON
part-0000...
filesIs there any best option apart from below,
I had below question on above design
Upvotes: 3
Views: 4171
Reputation: 4045
spark.read
.json(sourcePath)
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.json(tempTarget1)
val fs = FileSystem.get(new URI(s"s3a://$bucketName"), sc.hadoopConfiguration)
val deleted = fs
.delete(new Path(sourcePath + File.separator), true)
logger.info(s"S3 folder path deleted=${deleted} sparkUuid=$sparkUuid path=${sourcePath}")
val renamed = fs
.rename(new Path(tempTarget1),new Path(sourcePath))
Tried and failed,
cachedDf.write
went back to check the S3 file which were manually cleaned by me before write.Upvotes: 0
Reputation: 29237
yes its possible to skip #2. Writing in to same location can be done with SaveMode.Overwrite
the same location where you read from.
when you first read the json i.e. #1 as dataframe it will be in memory if you do cache. after that you can do a clean up and combile all json in to one with union and store in parquet file in a single step. something like this example.
Case 1: all jsons are in different folders and you want them to store final dataframe as parquet in the same location where jsons are there...
val dfpath1 = spark.read.json("path1")
val dfpath2 = spark.read.json("path2")
val dfpath3 = spark.read.json("path3")
val df1 = cleanup1 function dfpath1 returns dataframe
val df2 = cleanup2 function dfpath2 returns dataframe
val df3 = cleanup3 function dfpath3 returns dataframe
val dfs = Seq(df1, df2, df3)
val finaldf = dfs.reduce(_ union _) // you should have same schema while doing union..
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with samelocations json.parquet")
Case 2: all jsons are in same folders and you want them to store final dataframe as multiple parquet in the same root location where jsons are there...
In this case no need for reading as multiple dataframes you can give root path where jsons are there with same schema
val dfpath1 = spark.read.json("rootpathofyourjsons with same schema")
// or you can give multiple paths spark.read.json("path1","path2","path3")
// since it s supported by spark dataframe reader like this ...def json(paths: String*):
val finaldf = cleanup1 function returns dataframe
finaldf.write.mode(SaveMode.Overwrite).parquet("final_file with sameroot locations json.parquet")
AFAIK, In either case aws s3 sdk api is no more required.
import org.apache.spark.sql.functions._
val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")
df.show(false)
df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
val df1 = spark.read.format("parquet").load(".../temp") // read back again
val df2 = df1.withColumn("cleanup" , lit("Quick silver want to cleanup")) // like you said you want to clean it.
//BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.
df2.cache // cache to avoid FileNotFoundException
df2.show(2, false) // light action to avoid FileNotFoundException
// or println(df2.count) // action
df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
println("quick silver saved in same directory where he read it from final records he saved after clean up are ")
df2.show(false)
Result :
+---+----+
|sex|date|
+---+----+
|1 |10 |
|2 |20 |
|3 |30 |
+---+----+
+---+----+----------------------------+
|sex|date|cleanup |
+---+----+----------------------------+
|1 |10 |Quick silver want to cleanup|
|2 |20 |Quick silver want to cleanup|
+---+----+----------------------------+
only showing top 2 rows
quick silver saved in same directory where he read it from final records he saved after clean up are
+---+----+----------------------------+
|sex|date|cleanup |
+---+----+----------------------------+
|1 |10 |Quick silver want to cleanup|
|2 |20 |Quick silver want to cleanup|
|3 |30 |Quick silver want to cleanup|
+---+----+----------------------------+
Screen shot of File saved and readback cleaned up and again saved :
Note : You need to implement the case 1 or case 2 like suggested update above...
Upvotes: 1