Reputation: 185
I am currently trying to perform LDA on a spark cluster. I have an RDD such that
>>> myRdd.take(2)
[(218603, [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0]), (95680, [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0])]
but calling
model = LDA.train(myRdd, k=5, seed=42)
gives the following error from a worker:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5874.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5874.0): java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.List
I do not know how to interpret this error aside from the obvious, so any advice would be appreciated; the documentation on mllib's LDA is rather sparse
I obtain the RDD from the following process, beginning with a dataframe document_instances
that has columns "doc_label" and "terms"
hashingTF = HashingTF(inputCol="terms", outputCol="term_frequencies", numFeatures=10)
tf_matrix = hashingTF.transform(document_instances)
myRdd = tf_matrix.select("doc_label", "term_frequencies").rdd
Using this directly gives the same error. Now, this is using HashingTF in pyspark.ml.feature, so I suspected there may be a conflict caused by the difference between Vector in mllib vs Vector in ml, but mapping directly using the Vector.fromML() function gives the same error, as does using
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
(old_row.term, old_row.term_frequencies.toArray().tolist()))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
(old_row.term, old_row.term_frequencies.toArray()))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
(old_row.term, Vectors.fromML(old_row.term_frequencies)))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
(old_row.term, old_row.term_frequencies))
Upvotes: 1
Views: 281
Reputation: 185
So, it turns out that the spark documentation was a bit misleading when it says "RDD of documents, which are tuples of document IDs and term (word) count vectors." Perhaps I misunderstood, but when changing the tuple to a list, this error seems to disappear (although it seems to have been replaced by a different error)
Changing
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
(old_row.term, old_row.term_frequencies))
to
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
[old_row.term, Vectors.fromML(old_row.term_frequencies)])
Appears to alleviate the asked problem, after comparing with their example code
http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA
Upvotes: 1