Reputation: 354
After some processing I have a DStream[String , ArrayList[String]] , so when I am writing it to hdfs using saveAsTextFile and after every batch it overwrites the data , so how to write new result by appending to previous results
output.foreachRDD(r => {
r.saveAsTextFile(path)
})
Edit :: If anyone could help me in converting the output to avro format and then writing to HDFS with appending
Upvotes: 3
Views: 11652
Reputation: 99
Here I solve the issue without dataframe
import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
messages.foreachRDD{ rdd =>
rdd.repartition(1)
val eachRdd = rdd.map(record => record.value)
if(!eachRdd.isEmpty) {
eachRdd.saveAsTextFile(hdfs_storage + DateTimeFormatter.ofPattern("yyyyMMddHHmmss").format(LocalDateTime.now) + "/")
}
}
Upvotes: 0
Reputation: 1
Storing the streaming output to HDFS will always create a new files even in case when you use append with parquet which leads to a small files problems on Namenode. I may recommend to write your output to sequence files where you can keep appending to the same file.
Upvotes: 0
Reputation: 1982
If you want to append the same file and store on file system, store it as a parquet file. You can do it by
kafkaData.foreachRDD( rdd => {
if(rdd.count()>0)
{
val df=rdd.toDF()
df.write(SaveMode.Append).save("/path")
}
Upvotes: 0
Reputation: 37435
saveAsTextFile
does not support append. If called with a fixed filename, it will overwrite it every time.
We could do saveAsTextFile(path+timestamp)
to save to a new file every time. That's the basic functionality of DStream.saveAsTextFiles(path)
An easily accessible format that supports append
is Parquet. We first transform our data RDD to a DataFrame
or Dataset
and then we can benefit from the write support offered on top of that abstraction.
case class DataStructure(field1,..., fieldn)
... streaming setup, dstream declaration, ...
val structuredOutput = outputDStream.map(record => mapFunctionRecordToDataStructure)
structuredOutput.foreachRDD(rdd =>
import sparkSession.implicits._
val df = rdd.toDF()
df.write.format("parquet").mode("append").save(s"$workDir/$targetFile")
})
Note that appending to Parquet files gets more expensive over time, so rotating the target file from time to time is still a requirement.
Upvotes: 5