Reputation: 2994
I have a sorted dstream which i can print as follows
new Function<JavaPairRDD<Double,String>, Void>(){
public Void call(JavaPairRDD<Double, String> rdd){
String out = "\n Top Values: \n";
for (Tuple2<Double, String> t: rdd.take(10)){
out = out + t.toString() + "\n";
return null;
However, I would like to save this to a text file instead of just printing out the 10 values. *PLEASE NOTE, I WANT TO SAVE TO TEXT FILE JUST THE TOP 10 VALUES, not the entire dstream
I'll appreciate any help. Also I am coding in Java, not scala.
Upvotes: 0
Views: 910
Reputation: 1
you can do it like this.
object DStreamTopN {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("DStreamTopN").setMaster("local[3]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.receiverStream(new UdpReceiver(1514, "UTF-8"))
val wc = lines.flatMap(_.split(" ")).map(_ -> 1).reduceByKey(_ + _)
val sort = wc.transform((rdd: RDD[(String, Int)]) => {
val topN = rdd.sortBy(_._2, false).take(3)
} }
Upvotes: 0
Reputation: 7452
Assuming that your input is sorted & done in scala:
val location = "hdfs://..."
val target = 10
sorted.foreachRDD({rdd, time =>
// Determine how many elements preceded each partition.
val partitionElemCounts = rdd.mapPartitions(items =>
List(items.size)).collect().scanLeft(0) { case (sum,e) => sum+e}
// Get the number of elements in each partition we need
val nRdd = rdd.mapPartitionsWithIndex { items, partition =>
items.take(max(0, target-partitionElemCounts(partition)))
// we append the time to the path so each segment is written out to a different directory
val out = location + time
Upvotes: 1