androboy
androboy

Reputation: 835

Spark split a file into multiple folders based on a field

I am trying to split a set of S3 files like below based on a column into individual column based folders. I am not sure of the problem with my code below.

column 1, column 2
20130401, value1
20130402, value2
20130403, value3

val newDataDF = sqlContext.read.parquet("s3://xxxxxxx-bucket/basefolder/")
    newDataDF.cache()
    val uniq_days = newDataDF.select(newDataDF("column1")).distinct.show()
    uniq_days.cache()
    uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})

Can you please help? Even a pyspark version is ok. I am looking for following structure.

s3://xxxxxx-bucket/partitionedfolder/20130401/part-***

    column 1, column 2
    20130401, value 1
s3://xxxxxx-bucket/partitionedfolder/20130402/part-***

    column 1, column 2
    20130402, value 1
s3://xxxxxx-bucket/partitionedfolder/20130403/part-***

    column 1, column 2
    20130403, value 1

Here is the error

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 82.0 failed 4 times, most recent failure: Lost task 22.3 in stage 82.0 (TID 2753

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
Caused by: java.lang.NullPointerException

Update with current solution:

val newDataDF = sqlContext.read.parquet("s3://xxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.rdd.map(_.getString(0)).collect().toList
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})

Upvotes: 0

Views: 1867

Answers (1)

RBanerjee
RBanerjee

Reputation: 947

I think you missed "s" in the save. :)

http://docs.scala-lang.org/overviews/core/string-interpolation.html#the-s-string-interpolator

Change:

write.save("s3://xxxxxx-bucket/partitionedfolder/$x/")})

To:

write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})

There are more issues, show never returns any value.

Change:

val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.show()
uniq_days.cache()

To:

val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.rdd.map(_.getString(0)).collect().toList

Upvotes: 2

Related Questions