Reputation: 95
I'm implementing a k-means algorithm in Spark. When I run the code below I get a pickling error (shown below). If I modify it and place everything outside of the loop, it will correctly calculate centroids.
sc = SparkContext(appName="Document Similarity")
lines = sc.wholeTextFiles(sys.argv[1])
articles = lines.flatMap(lambda x: re.split(r' ',x[1]))
shingles = articles.flatMap(shingle_pairs.get_pairs)
sig_vecs = shingles.groupBy(lambda x: x[1]) \
.map(lambda x: sig_vector.create_vector(x, a, b, n, p))
centroids = k_means.init_centroids(sig_size, k)
for i in range(max_it):
# assign documents to closest cluster
docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))
# get count by key to use in mean calculation for new clusters
doc_count = docs.countByKey()
# recompute cluster centroids
reduced_docs = docs.reduceByKey(k_means.reducer)
centroids = reduced_docs.map(lambda x: k_means.mapper(x, doc_count))
The error is as follows:
pickle.PicklingError: Could not serialize object: Exception:
It appears that you are attempting to broadcast an RDD or reference an
RDD from an action or transformation. RDD transformations and actions
can only be invoked by the driver, not inside of other transformations;
for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid
because the values transformation and count action cannot be performed
inside of the rdd1.map transformation. For more information, see SPARK-5063.
Upvotes: 2
Views: 1428
Reputation: 36
As explained in the SPARK-5063 "Spark does not support nested RDDs". You are trying to access centroids
(RDD
) in map
on sig_vecs
(RDD
):
docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))
Converting centroids
to a local collection (collect
?) and adjusting classify_docs
should address the problem.
Upvotes: 2