aaa
aaa

Reputation: 89

create pyspark rdd with lambda

I want to count the percentage of each number.

rdd1=sc.parallelize([1,2,3,4,1,5,7,3])

I tried

rdd2=rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))

and got rdd2.collect(): [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)] then

percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1.count())))
print(percentage.collect())

it had error in print step then I tried

percentage=rdd2.map(lambda x:(x[0],(x[1]/len(rdd1.collect()))))
print(percentage.collect())

it also had error in print step.

Upvotes: 0

Views: 474

Answers (2)

Mario SG
Mario SG

Reputation: 161

I extract from what you said that you want the relative frequency of each unique member of the RDD.

from operator import add

rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
count = rdd1.count()

rdd2=rdd1
    .map(lambda x: (x, 1))  # [(1,1),(2,1),(3,1),(4,1),(1,1),(5,1),(7,1),(3,1)]
    .reduceByKey(add)       # [(1,2),(2,1),(3,2),(4,1),(5,1),(7,1)]
    .mapValues( lambda vSum : vSum / count ) 

rdd2.collect()
# [(1,2/8),(2,1/8),(3,2/8),(4,1/8),(5,1/8),(7,1/8)]

Upvotes: 2

pissall
pissall

Reputation: 7419

SPARK-5603 says that nested RDD operations are not supported.

You cannot reference an RDD action inside a transformation:

If you call the action that is count() before hand, your code will work.

rdd1 = sc.parallelize([1,2,3,4,1,5,7,3])
rdd2 = rdd1.map(lambda x: (x, 1)).reduceByKey(lambda current, next: (current+next))
rdd1_len = rdd1.count()
percentage=rdd2.map(lambda x:(x[0],(x[1]/rdd1_len)))

percentage.collect()
# [(1, 0.25), (2, 0.125), (3, 0.25), (4, 0.125), (5, 0.125), (7, 0.125)]

Upvotes: 2

Related Questions