Reputation: 835
I am trying to split a set of S3 files like below based on a column into individual column based folders. I am not sure of the problem with my code below.
column 1, column 2
20130401, value1
20130402, value2
20130403, value3
val newDataDF = sqlContext.read.parquet("s3://xxxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.show()
uniq_days.cache()
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
Can you please help? Even a pyspark version is ok. I am looking for following structure.
s3://xxxxxx-bucket/partitionedfolder/20130401/part-***
column 1, column 2
20130401, value 1
s3://xxxxxx-bucket/partitionedfolder/20130402/part-***
column 1, column 2
20130402, value 1
s3://xxxxxx-bucket/partitionedfolder/20130403/part-***
column 1, column 2
20130403, value 1
Here is the error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 82.0 failed 4 times, most recent failure: Lost task 22.3 in stage 82.0 (TID 2753
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
Caused by: java.lang.NullPointerException
Update with current solution:
val newDataDF = sqlContext.read.parquet("s3://xxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.rdd.map(_.getString(0)).collect().toList
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
Upvotes: 0
Views: 1867
Reputation: 947
I think you missed "s" in the save. :)
http://docs.scala-lang.org/overviews/core/string-interpolation.html#the-s-string-interpolator
Change:
write.save("s3://xxxxxx-bucket/partitionedfolder/$x/")})
To:
write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
There are more issues, show never returns any value.
Change:
val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.show()
uniq_days.cache()
To:
val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.rdd.map(_.getString(0)).collect().toList
Upvotes: 2