Jerome tan
Jerome tan

Reputation: 155

Does spark supports multiple output file with parquet format

The business case is that we'd like to split a big parquet file into small ones by a column as partition. We've tested using dataframe.partition("xxx").write(...). It took about 1hr with 100K entries of records. So, we are going to use map reduce to generate different parquet file in different folder. Sample code:

import org.apache.hadoop.io.NullWritable

import org.apache.spark._
import org.apache.spark.SparkContext._

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    key.asInstanceOf[String]+"/aa"
}

object Split {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SplitTest")
    val sc = new SparkContext(conf)
    sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))
      .map(value => (value._1, value._2 + "Test"))
      .partitionBy(new HashPartitioner(3))//.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
      .saveAsHadoopFile(args(0), classOf[String], classOf[String],
        classOf[RDDMultipleTextOutputFormat])
    sc.stop()
  }
}

The sample above just generates a text file, how to generate a parquet file with multipleoutputformat?

Upvotes: 0

Views: 605

Answers (1)

zero323
zero323

Reputation: 330443

Spark supports Parquet partitioning since 1.4.0 (1.5+ syntax):

df.write.partitionBy("some")

and bucketing since (2.0.0):

df.write.bucketBy("some")

with optional sortBy clause.

Upvotes: 1

Related Questions