Reputation: 155
The business case is that we'd like to split a big parquet file into small ones by a column as partition. We've tested using dataframe.partition("xxx").write(...). It took about 1hr with 100K entries of records. So, we are going to use map reduce to generate different parquet file in different folder. Sample code:
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]+"/aa"
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SplitTest")
val sc = new SparkContext(conf)
sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))
.map(value => (value._1, value._2 + "Test"))
.partitionBy(new HashPartitioner(3))//.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
.saveAsHadoopFile(args(0), classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
sc.stop()
}
}
The sample above just generates a text file, how to generate a parquet file with multipleoutputformat?
Upvotes: 0
Views: 605
Reputation: 330443
Spark supports Parquet partitioning since 1.4.0 (1.5+ syntax):
df.write.partitionBy("some")
and bucketing since (2.0.0):
df.write.bucketBy("some")
with optional sortBy
clause.
Upvotes: 1