Reputation: 441
My sparquet file like this
1, a, 1980-09-08
2, b, 1980-09-08
3, c, 2017-09-09
Hope the output file like this
the folder 19800908
contains data
1, a, 1980-09-08
2, b, 1980-09-08
and the folder 20170909
contains data
3, c, 2017-09-09
I know can groupBy key date
but don't know how to output multiple parquet file use such class MultipleTextOutputFormat
I don't want to foreach loop the keys, which to slow and need a lot of memory
now the code like this
val input = sqlContext.read.parquet(sourcePath)
.persist(StorageLevel.DISK_ONLY)
val keyRows: RDD[(Long, Row)] =
input.mapPartitions { partition =>
partition.flatMap { row =>
val key = format.format(row.getDate(3)).toLong
Option((key, row))
}
}.persist(StorageLevel.DISK_ONLY)
val keys = keyRows.keys.distinct().collect()
for (key <- keys) {
val rows = keyRows.filter { case (_key, _) => _key == key }.map(_._2)
val df = sqlContext.createDataFrame(rows, input.schema)
val path = s"${outputPrefix}/$key"
HDFSUtils.deleteIfExist(path)
df.write.parquet(path)
}
If I use the MultipleTextOutputFormat the output as follows which not I want
keyRows.groupByKey()
.saveAsHadoopFile(conf.getOutputPrefixDirectory, classOf[String], classOf[String],
classOf[SimpleMultipleTextOutputFormat[_, _]])
public class SimpleMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
// return super.generateFileNameForKeyValue(key, value, name);
return key.toString();
}
}
Upvotes: 0
Views: 3853
Reputation: 441
from this post spark partition data writing by timestamp
input
.withColumn("_key", date_format(col(partitionField), format.toPattern))
.write
.partitionBy("_key")
.parquet(conf.getOutputPrefixDirectory)
But how to remove the folder name '_ke='
Upvotes: 0
Reputation: 7207
Writing with partitioned column can be used:
df.write.partitionBy("dateString").parquet("/path/to/file").
Difference - folder name will be like "dateString=2017-09-09", and new string column "dateString" have to be created before saving.
Upvotes: 1