Angshusuri
Angshusuri

Reputation: 57

value saveAsTextFile is not a member of org.apache.spark.streaming.dstream.DStream[(String, Long)]

I am try to save my wordcount result in file.

val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.saveAsTextFile("/home/hadoop/datafile1")

But it is showing

value saveAsTextFile is not a member of org.apache.spark.streaming.dstream.DStream[(String, Long)]              [error]     wordCounts.saveAsTextFile("/home/hadoop/datafile1")

I am using spark 2.1. I show one answer which is suggesting old spark version.But I want to do in spark 2.1. Thanks.

Upvotes: 2

Views: 2972

Answers (2)

Amit
Amit

Reputation: 1121

API documentation mentions the API as "saveAsTextFiles"

saveAsTextFiles(String prefix, String suffix)

Save each RDD in this DStream as at text file, using string representation of elements.

Upvotes: 1

Vidya
Vidya

Reputation: 30310

You are using a method defined for RDD on a DStream.

This is the method on RDD:

def saveAsTextFile(path: String): Unit

...with description "Save this RDD as a text file, using string representations of elements."

This is the method on DStream:

saveAsTextFiles(prefix: String, suffix: String = ""): Unit

...with description "Save each RDD in this DStream as at text file, using string representation of elements. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS.suffix."

So the method signatures are different--both in name and parameters.

In your code, wordCounts is apparently a DStream, so it does not have a saveAsTextFile method.

However, I get the feeling you are confusing the abstractions and really want to write the individual RDDs contained in a DStream microbatch. To do that:

counts.foreachRDD { rdd => 
  ...         
  rdd.saveAsTextFiles(s"/home/hadoop/datafile-$timestamp")

}

Upvotes: 2

Related Questions