Reputation: 89
I want to count the percentage of each number.
rdd1=sc.parallelize([1,2,3,4,1,5,7,3])
I tried
rdd2=rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))
and got rdd2.collect(): [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)] then
percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1.count())))
print(percentage.collect())
it had error in print step then I tried
percentage=rdd2.map(lambda x:(x[0],(x[1]/len(rdd1.collect()))))
print(percentage.collect())
it also had error in print step.
Upvotes: 0
Views: 474
Reputation: 161
I extract from what you said that you want the relative frequency
of each unique member of the RDD.
from operator import add
rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
count = rdd1.count()
rdd2=rdd1
.map(lambda x: (x, 1)) # [(1,1),(2,1),(3,1),(4,1),(1,1),(5,1),(7,1),(3,1)]
.reduceByKey(add) # [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)]
.mapValues( lambda vSum : vSum / count )
rdd2.collect()
# [(1,2/8),(2,1/8),(3,2/8),(4,1/8),(5,1/8),(7,1/8)]
Upvotes: 2
Reputation: 7419
SPARK-5603 says that nested RDD operations are not supported.
You cannot reference an RDD action inside a transformation:
If you call the action that is count()
before hand, your code will work.
rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
rdd2 = rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))
rdd1_len = rdd1.count()
percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1_len)))
percentage.collect()
# [(1, 0.25), (2, 0.125), (3, 0.25), (4, 0.125), (5, 0.125), (7, 0.125)]
Upvotes: 2