Reputation: 579
I have a spark dataframe with multiple labels and features coreesponding to each, like this:
+----------------+--------------------+
| label| feature_paths|
+----------------+--------------------+
| person1|[-0.015756417, 0....|
| person1|[-0.05177306, 0.1...|
| person1|[-0.11631858, 0.1...|
| person2|[-0.058303248, 0....|
| person2|[-0.03415013, 0.0...|
+----------------+--------------------+
I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service)
which will eventually save a gmm
model for each entity.
The code is like:
def service(rddentry):
label = rddentry[0]
features = rddentry[1]
print(label)
from sklearn.mixture import BayesianGaussianMixture
from sklearn.externals import joblib
gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
model = gmm.fit(features)
joblib.dump(model, str(label)+'.joblib')
return model
My goals, that I want to achieve is:
Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels. Each rdd entry will have multiple features, belonging to a single label.
Send each rdd partition to the service function.
My experiments until now:
When doing sdf.repartition('label')
, it creates several empty dataframes.
sdf.partionBy('label')
also does not work. It creates a random number of partitions.
I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.
Upvotes: 3
Views: 3540
Reputation: 1023
You can use partitionBy
with new HashPartitioner(number_of_partitions)
One extra action required to count the unique labels count and you can use that as number of required partitions.
Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map
to get the necessary times from a tuple
scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))
Let me know if it helps.
Upvotes: 1