theMadKing
theMadKing

Reputation: 2074

PySpark Broadcast Variable Join

I am performing a join, and I have my data across over 100 nodes. So I have a small list of key/value that I am joining with another key/value pair.

My list looks like such:

[[1, 0], [2, 0], [3, 0], [4, 0], [5, 0], [6, 0], [7, 0], [8, 0], [9, 0], [10, 0], [11, 0], [16, 0], [18, 0], [19, 0], [20, 0], [21, 0], [22, 0], [23, 0], [24, 0], [25, 0], [26, 0], [27, 0], [28, 0], [29, 0], [36, 0], [37, 0], [38, 0], [39, 0], [40, 0], [41, 0], [42, 0], [44, 0], [46, 0]]

I have broadcast variable:

numB = sc.broadcast(numValuesKV)

When I do my join:

numRDD = columnRDD.join(numB.value)

I get the following error:

AttributeError: 'list' object has no attribute 'map'

Upvotes: 3

Views: 4266

Answers (3)

ayan guha
ayan guha

Reputation: 1257

you are broadcasting a list, which is absolutely fine.

what you need to do is

b=sc.broadcast(lst)
rdd.map(lambda t: t if t[0] in b.value)

here t[0] should look like [1,0] etc. But I hope you got the idea....

Upvotes: 2

maasg
maasg

Reputation: 37435

rdd.join(other) is mean to join two RDDs, therefore it expects other to be an RDD. To use the efficient 'small table broadcast' join trick, you need to do the join 'by hand'. In Scala, it would look like this:

rdd.mapPartitions{iter =>
    val valueMap = numB.value.toMap
    iter.map{case (k,v) => (k,(v,map(v))}
}

This applies the join using the broadcast value to each partition of the RDD in a distributed manner.

PySpark code should be pretty similar.

Upvotes: 1

urug
urug

Reputation: 415

can you try making numValuesKV a dictionary and see if it works.

Upvotes: 1

Related Questions