Ivan Alex
Ivan Alex

Reputation: 95

From kafka to hdfs through spark

Maybe somebody has used this in some project: I write to Cassandra from Spark, and in Spark I use kafkaUtils.createDirectStream. Through the Spark-Cassandra connector we can use the Dstream.saveToCassandra method. But for saving/appending to hdfs I use:

stream.map(_.value).foreachRDD(rdd => {
  val conf = new Configuration()
  conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000/")
  val fs = FileSystem.get(conf)
  fs.append(new Path("textfile.txt"))
    .write(rdd.collect().mkString.getBytes())
  fs.close()
})

But I don't think it is the best way to do this. It may be better to use something like:

val prepStr = {
  val str = new StringBuilder
  if (!rdd.isEmpty()) {
    str.append(rdd.collect().mkString)
  }
  str
}

And finally:

fs.append(path).write(prepStr.mkString.getBytes())

Or maybe somebody used another way?

Upvotes: 2

Views: 571

Answers (1)

maasg
maasg

Reputation: 37435

Assuming that your stream is of type DStream[String] you could make use of the append function offered by the dataframe writer:

dstream.foreachRDD{rdd => 
    import sparkSession.implicits._
    val df = rdd.toDF()
    df.write.mode("append").text("/hdfs/path/to/file")
}

Upvotes: 2

Related Questions