Reputation: 95
Maybe somebody has used this in some project: I write to Cassandra from Spark, and in Spark I use kafkaUtils.createDirectStream
. Through the Spark-Cassandra connector we can use the Dstream.saveToCassandra
method. But for saving/appending to hdfs I use:
stream.map(_.value).foreachRDD(rdd => {
val conf = new Configuration()
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000/")
val fs = FileSystem.get(conf)
fs.append(new Path("textfile.txt"))
.write(rdd.collect().mkString.getBytes())
fs.close()
})
But I don't think it is the best way to do this. It may be better to use something like:
val prepStr = {
val str = new StringBuilder
if (!rdd.isEmpty()) {
str.append(rdd.collect().mkString)
}
str
}
And finally:
fs.append(path).write(prepStr.mkString.getBytes())
Or maybe somebody used another way?
Upvotes: 2
Views: 571
Reputation: 37435
Assuming that your stream is of type DStream[String]
you could make use of the append function offered by the dataframe writer:
dstream.foreachRDD{rdd =>
import sparkSession.implicits._
val df = rdd.toDF()
df.write.mode("append").text("/hdfs/path/to/file")
}
Upvotes: 2