Reputation: 510
I am new in Spark, but I have had some experience in Hadoop. I've trying to adapt a python code I use in Hadoop streaming that filters out some tweets in JSON format.
Normally, my function has a condition that prints to stdout the tweet if the condition is true and prints nothing otherwise.
def filter(tweet):
if criteria(tweet) is True:
print json.dumps(tweet)
This way, the final output file will contain only the tweets I want.
However, when trying to use Spark, I had to change the print
statement with a return
so I return the tweet if the condition is True, and None
otherwise.
def filter(tweet):
if criteria(tweet) is True:
return json.dumps(tweet)
The problem appears when trying to save the results to disk. Using saveAsTextFile
method of Pyspark, it saves not only the tweets I want but also the None
I return when condition is False.
How can I avoid writing None
to the file so I only save the desired tweets?
Many thanks in advance.
Jorge
Upvotes: 1
Views: 317
Reputation: 1538
If you are using your function in a map, it will not reduce the number of elements you have. To filter elements, you must to use the filter
method to test if an element is None
after you map
.
Upvotes: 0
Reputation: 330353
Quite elegant solution, which avoids chaining filter
and map
, is to use flatMap
:
def filter(tweet):
return [json.dumps(tweet)] if criteria(tweet) is True else []
some_rdd.flatMap(filter)
Upvotes: 1