Reputation: 3215
I am dabbing with hadoop platforms and on of the things that I am experimenting with is Spark-Streaming API. I am trying to read a file stream to count number of words after every x-seconds (accumulated sum of the history). Now I want to print the top-k words onto a file. Here is what I am trying to do:
# sort the dstream for current batch
sorted_counts = counts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
# get the top K values of each rdd from the transformed dstream
topK = sorted_counts.transform(lambda rdd: rdd.take(k))
I am able to print the output to console/log file using:
sorted_counts.pprint(k)
But the problem is when I try to print it to a file using:
topK.saveAsTextFiles(out_path)
or even if I try to print topK to the console as:
topK.pprint()
I get the following error,
AttributeError: 'list' object has no attribute '_jrdd'
which I assume is because rdd.take(k) return actual list instead of an rdd. How can I work around it? Also I want to generate different file for each newly computed word count... i.e. a new output file every x-seconds (which is guaranteed with saveAsTextFiles(). I am using python to program this, if it helps. Thanks!
Upvotes: 0
Views: 1090
Reputation: 2934
It seems that there is no API that would allow you to do that. But you can workaround:
rdd.zipWithIndex().filter(<filter with big index>).map(<remove index here>)
Another solution would be (w/o sorting):
sc.parallelize(rdd.top(...))
This way you don't need to sort all the RDD, but only to take biggest elements and then create RDD from them.
Upvotes: 1