JSR29
JSR29

Reputation: 354

How to write Spark Streaming output to HDFS without overwriting

After some processing I have a DStream[String , ArrayList[String]] , so when I am writing it to hdfs using saveAsTextFile and after every batch it overwrites the data , so how to write new result by appending to previous results

output.foreachRDD(r => {
  r.saveAsTextFile(path)
})

Edit :: If anyone could help me in converting the output to avro format and then writing to HDFS with appending

Upvotes: 3

Views: 11652

Answers (4)

Sudeep Singh Thakur
Sudeep Singh Thakur

Reputation: 99

Here I solve the issue without dataframe

import java.time.format.DateTimeFormatter
import java.time.LocalDateTime

 messages.foreachRDD{ rdd =>
    rdd.repartition(1)
    val eachRdd = rdd.map(record => record.value)
    if(!eachRdd.isEmpty) {
      eachRdd.saveAsTextFile(hdfs_storage + DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now) + "/")
    }
  }

Upvotes: 0

user11715137
user11715137

Reputation: 1

Storing the streaming output to HDFS will always create a new files even in case when you use append with parquet which leads to a small files problems on Namenode. I may recommend to write your output to sequence files where you can keep appending to the same file.

Upvotes: 0

Ishan Kumar
Ishan Kumar

Reputation: 1982

If you want to append the same file and store on file system, store it as a parquet file. You can do it by

  kafkaData.foreachRDD( rdd => {
  if(rdd.count()>0)
  {
    val df=rdd.toDF()
    df.write(SaveMode.Append).save("/path")
   }

Upvotes: 0

maasg
maasg

Reputation: 37435

saveAsTextFile does not support append. If called with a fixed filename, it will overwrite it every time. We could do saveAsTextFile(path+timestamp) to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)

An easily accessible format that supports append is Parquet. We first transform our data RDD to a DataFrame or Dataset and then we can benefit from the write support offered on top of that abstraction.

case class DataStructure(field1,..., fieldn)

... streaming setup, dstream declaration, ...

val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd => 
  import sparkSession.implicits._
  val df = rdd.toDF()
  df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")

})

Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.

Upvotes: 5

Related Questions