eric6685
eric6685

Reputation: 95

Spark - pickling error when accessing RDD in loop

I'm implementing a k-means algorithm in Spark. When I run the code below I get a pickling error (shown below). If I modify it and place everything outside of the loop, it will correctly calculate centroids.

sc = SparkContext(appName="Document Similarity")
lines = sc.wholeTextFiles(sys.argv[1])

articles = lines.flatMap(lambda x: re.split(r' ',x[1]))

shingles = articles.flatMap(shingle_pairs.get_pairs)

sig_vecs = shingles.groupBy(lambda x: x[1]) \
                   .map(lambda x: sig_vector.create_vector(x, a, b, n, p))

centroids = k_means.init_centroids(sig_size, k)

for i in range(max_it):
    # assign documents to closest cluster
    docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))

    # get count by key to use in mean calculation for new clusters
    doc_count = docs.countByKey()

    # recompute cluster centroids
    reduced_docs = docs.reduceByKey(k_means.reducer)
    centroids = reduced_docs.map(lambda x: k_means.mapper(x, doc_count))

The error is as follows:

pickle.PicklingError: Could not serialize object: Exception:
It appears that you are attempting to broadcast an RDD or reference an 
RDD from an action or transformation. RDD transformations and actions 
can only be invoked by the driver, not inside of other transformations; 
for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid 
because the values transformation and count action cannot be performed 
inside of the rdd1.map transformation. For more information, see SPARK-5063.

Upvotes: 2

Views: 1428

Answers (1)

user8797795
user8797795

Reputation: 36

As explained in the SPARK-5063 "Spark does not support nested RDDs". You are trying to access centroids (RDD) in map on sig_vecs (RDD):

docs = sig_vecs.map(lambda x: k_means.classify_docs(x, centroids))

Converting centroids to a local collection (collect?) and adjusting classify_docs should address the problem.

Upvotes: 2

Related Questions