Knows Not Much
Knows Not Much

Reputation: 31546

Call Distinct on 'pyspark.resultiterable.ResultIterable'

I am writing some spark code and I have an RDD which looks like

[(4, <pyspark.resultiterable.ResultIterable at 0x9d32a4c>), 
 (1, <pyspark.resultiterable.ResultIterable at 0x9d32cac>), 
 (5, <pyspark.resultiterable.ResultIterable at 0x9d32bac>), 
 (2, <pyspark.resultiterable.ResultIterable at 0x9d32acc>)] 

What I need to do is to call a distinct on the pyspark.resultiterable.ResultIterable

I tried this

def distinctHost(a, b):
  p = sc.parallelize(b)
  return (a, p.distinct())

mydata.map(lambda x: distinctHost(*x))

But I get an error:

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transforamtion. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

The error is self explanatory that I cannot use sc. But I need to find a way to cover the pyspark.resultiterable.ResultIterable to an RDD so that I can call distinct on it.

Upvotes: 4

Views: 1704

Answers (1)

zero323
zero323

Reputation: 330303

Straightforward approach is to use sets:

from numpy.random import choice, seed
seed(323)

keys = (4, 1, 5, 2)
hosts = [
    u'in24.inetnebr.com',
    u'ix-esc-ca2-07.ix.netcom.com',
    u'uplherc.upl.com',
    u'slppp6.intermind.net',
    u'piweba4y.prodigy.com'
]

pairs = sc.parallelize(zip(choice(keys, 20), choice(hosts, 20))).groupByKey()
pairs.map(lambda (k, v): (k, set(v))).take(3)

Result:

[(1, {u'ix-esc-ca2-07.ix.netcom.com', u'slppp6.intermind.net'}),
 (2,
  {u'in24.inetnebr.com',
   u'ix-esc-ca2-07.ix.netcom.com',
   u'slppp6.intermind.net',
   u'uplherc.upl.com'}),
 (4, {u'in24.inetnebr.com', u'piweba4y.prodigy.com', u'uplherc.upl.com'})]

If there is a particular reason for using rdd.disinct you can try something like this:

def distinctHost(pairs, key):
    return (pairs
        .filter(lambda (k, v): k == key)
        .flatMap(lambda (k, v): v)
        .distinct())

[(key, distinctHost(pairs, key).collect()) for key in pairs.keys().collect()]

Upvotes: 2

Related Questions