Reputation: 11
I followed this post to run KMeans in parallel. I used Python 2.7 and Spark 2.0.2 on EMR.
How to run multiple jobs in one Sparkcontext from separate threads in PySpark?
As quoted in the post, jobs submitted from different processes should not affect each other.
Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. By “job”, in this section, we mean a Spark action (e.g. save, collect) and any tasks that need to run to evaluate that action. Spark’s scheduler is fully thread-safe and supports this use case to enable applications that serve multiple requests (e.g. queries for multiple users)." http://spark.apache.org/docs/latest/job-scheduling.html
However the resulting model's clusters number K is different from what was passed in.
Code:
from pyspark.ml.clustering import KMeans
from sklearn.datasets.samples_generator import make_blobs
from pyspark.ml.linalg import Vectors
import random
random.seed(1)
group_size = 30
n_groups = 20
n_samples= n_groups * group_size
n_features=2
n_centers=4
xs, ys = make_blobs(n_samples=n_samples, n_features=n_features, centers=n_centers, cluster_std=1.0, center_box=(-10.0, 10.0), shuffle=True, random_state=None)
x_groups = []
for i in range(n_groups):
x_groups.append(xs[i*group_size: (i+1)*group_size])
def do_kmean(xs):
data = []
for x in xs:
data.append((Vectors.dense(x.tolist()),) )
df = spark.createDataFrame(data, ["features"])
num_clusters = random.randint(5,10)
kmeans = KMeans(k=num_clusters, maxIter=1, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(df)
return [num_clusters, kmeans.getK()]
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=8)
result = tpool.map(do_kmean, x_groups)
Result: (Input K vs what KMeans actually used)
[[5, 9],
[8, 9],
[6, 8],
[10, 9],
[7, 9],
[9, 9],
[7, 9],
[9, 9],
[5, 5],
[5, 9],
[9, 7],
[9, 9],
[5, 7],
[10, 5],
[7, 7],
[7, 7],
[6, 6],
[10, 10],
[10, 10],
[5, 5]]
It seems Spark is not thread/process safe and accessing other process's copy of the K. Could any of the Spark configuration cause this problem or is this a Spark bug?
Upvotes: 0
Views: 537
Reputation: 11
This is indeed a bug for Spark 2.0.2 and 2.1.0. I was able to replicate the bug on my local machine with above two versions. The bug is fixed for Spark 2.1.1.
https://issues.apache.org/jira/browse/SPARK-19348
Upvotes: 1