Reputation: 2074
I am performing a join, and I have my data across over 100 nodes. So I have a small list of key/value that I am joining with another key/value pair.
My list looks like such:
[[1, 0], [2, 0], [3, 0], [4, 0], [5, 0], [6, 0], [7, 0], [8, 0], [9, 0], [10, 0], [11, 0], [16, 0], [18, 0], [19, 0], [20, 0], [21, 0], [22, 0], [23, 0], [24, 0], [25, 0], [26, 0], [27, 0], [28, 0], [29, 0], [36, 0], [37, 0], [38, 0], [39, 0], [40, 0], [41, 0], [42, 0], [44, 0], [46, 0]]
I have broadcast variable:
numB = sc.broadcast(numValuesKV)
When I do my join:
numRDD = columnRDD.join(numB.value)
I get the following error:
AttributeError: 'list' object has no attribute 'map'
Upvotes: 3
Views: 4266
Reputation: 1257
you are broadcasting a list, which is absolutely fine.
what you need to do is
b=sc.broadcast(lst)
rdd.map(lambda t: t if t[0] in b.value)
here t[0] should look like [1,0] etc. But I hope you got the idea....
Upvotes: 2
Reputation: 37435
rdd.join(other)
is mean to join two RDDs, therefore it expects other
to be an RDD. To use the efficient 'small table broadcast' join trick, you need to do the join 'by hand'. In Scala, it would look like this:
rdd.mapPartitions{iter =>
val valueMap = numB.value.toMap
iter.map{case (k,v) => (k,(v,map(v))}
}
This applies the join using the broadcast value to each partition of the RDD in a distributed manner.
PySpark code should be pretty similar.
Upvotes: 1