yjxyjx
yjxyjx

Reputation: 121

How can I make (Spark1.6) saveAsTextFile to append existing file?

In SparkSQL,I use DF.wirte.mode(SaveMode.Append).json(xxxx),but this method get these files likeenter image description here

the filename is too complex and random,I can't use api to get.So I want to use saveAstextfile ,beacuse filename is not complex and regular, but I don't know how to append file in same diretory?Appreciate for your time.

Upvotes: 4

Views: 16439

Answers (3)

Ram Ghadiyaram
Ram Ghadiyaram

Reputation: 29155

worked on Spark 1.5 , I think this is right usage..

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).**partitionBy**("parameter1", "parameter2").save(path);

Upvotes: 2

Matiji66
Matiji66

Reputation: 737

You can try this method which I find from somewhere. Process Spark Streaming rdd and store to single HDFS file

    import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path }

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = {
  val sourceFile = hdfsServer + "/tmp/"
  rdd.saveAsTextFile(sourceFile)
  val dstPath = hdfsServer + "/final/"
  merge(sourceFile, dstPath, fileName)
}

def merge(srcPath: String, dstPath: String, fileName: String): Unit = {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  val destinationPath = new Path(dstPath)
  if (!hdfs.exists(destinationPath)) {
    hdfs.mkdirs(destinationPath)
  }
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null)
}

Upvotes: 2

NehaM
NehaM

Reputation: 1352

As spark uses HDFS, this is the typical output it produces. You can use the FileUtil to merge the files back into one. It is an efficient solution as it doesn't require spark to collect whole data into single memory by partitioning it into 1. This is the approach i follow.

import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}   

val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
val hdfs = FileSystem.get(hadoopConf)
val mergedPath = "merged-" + filePath + ".json"
val merged = new Path(mergedPath)
if (hdfs.exists(merged)) {
  hdfs.delete(merged, true)
}
df.wirte.mode(SaveMode.Append).json(filePath)

FileUtil.copyMerge(hdfs, path, hdfs, merged, false, hadoopConf, null)

You can read the single file using mergedPath location. Hope it helps.

Upvotes: 2

Related Questions