user3038457
user3038457

Reputation: 185

Pyspark mllib LDA error: Object cannot be cast to java.util.List

I am currently trying to perform LDA on a spark cluster. I have an RDD such that

>>> myRdd.take(2)
[(218603, [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0]), (95680, [0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0])]

but calling

model = LDA.train(myRdd, k=5, seed=42)

gives the following error from a worker:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5874.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5874.0): java.lang.ClassCastException: [Ljava.lang.Object; cannot be cast to java.util.List

I do not know how to interpret this error aside from the obvious, so any advice would be appreciated; the documentation on mllib's LDA is rather sparse

I obtain the RDD from the following process, beginning with a dataframe document_instances that has columns "doc_label" and "terms"

hashingTF = HashingTF(inputCol="terms", outputCol="term_frequencies", numFeatures=10)
tf_matrix = hashingTF.transform(document_instances)
myRdd = tf_matrix.select("doc_label", "term_frequencies").rdd

Using this directly gives the same error. Now, this is using HashingTF in pyspark.ml.feature, so I suspected there may be a conflict caused by the difference between Vector in mllib vs Vector in ml, but mapping directly using the Vector.fromML() function gives the same error, as does using

myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies.toArray().tolist()))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies.toArray()))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, Vectors.fromML(old_row.term_frequencies)))
myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies))

Upvotes: 1

Views: 281

Answers (1)

user3038457
user3038457

Reputation: 185

So, it turns out that the spark documentation was a bit misleading when it says "RDD of documents, which are tuples of document IDs and term (word) count vectors." Perhaps I misunderstood, but when changing the tuple to a list, this error seems to disappear (although it seems to have been replaced by a different error)

Changing

myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    (old_row.term, old_row.term_frequencies))

to

myRdd = tf_matrix.select(...).rdd.map(lambda old_row: \
                                    [old_row.term, Vectors.fromML(old_row.term_frequencies)])

Appears to alleviate the asked problem, after comparing with their example code

http://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.LDA

Upvotes: 1

Related Questions