Reputation: 67
I have 2 RDD:
so for each 'id' in RDD2 I want to calculate the mean of score for each word in text if it has score
def predecir(texto):
contador = 0
prediccion = 0
for palabra in texto:
puntaje = listaRDD.lookup(palabra)
if puntaje:
puntaje = puntaje[0]
prediccion += puntaje
contador += 1
return (float(prediccion)/ contador)
listaTestRDD = listaTestRDD.map(lambda x: (x[0], predecir(x[1])))
print listaTestRDD.take(1)
And I get this error message
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
How can I do to solve it? I can´t use two RDD one inside the other? How can I convert the RDD1 to a dictionary in order to find a word in O(1)?
Upvotes: 1
Views: 1544
Reputation:
Try:
RDD2.flatMapValues(lambda x: x) \
.map(lambda x: (x[1], x[0])) \
.leftOuterJoin(RDD1) \
.values() \
.map(lambda x: (x[0], (x[1], 1) if x[1] is not None else (0, 0))) \
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
.mapValues(lambda x: x[0] / float(x[1]) if x[1] else 0.0)
Upvotes: 1