Pradip Gupta
Pradip Gupta

Reputation: 579

Spark Dataframe grouping and partition by key with a set number of partitions.

I have a spark dataframe with multiple labels and features coreesponding to each, like this:

+----------------+--------------------+
|           label|       feature_paths|
+----------------+--------------------+
|         person1|[-0.015756417, 0....|
|         person1|[-0.05177306, 0.1...|
|         person1|[-0.11631858, 0.1...|
|         person2|[-0.058303248, 0....|
|         person2|[-0.03415013, 0.0...|
+----------------+--------------------+

I want to train a clustering model for each label (person), so basically, I want to create an rdd for each label, and then run a map operation like rdd.map(service) which will eventually save a gmm model for each entity.

The code is like:

def service(rddentry):

    label = rddentry[0]
    features = rddentry[1]

    print(label)

    from sklearn.mixture import BayesianGaussianMixture
    from sklearn.externals import joblib

    gmm = BayesianGaussianMixture(n_components=3, covariance_type="diag", init_params='kmeans')
    model = gmm.fit(features)
    joblib.dump(model, str(label)+'.joblib') 

    return model

My goals, that I want to achieve is:

  1. Create an rdd where the number of partitions is equal to the number of unique labels, such that: rdd.getNumPartition() = no_of_unique_labels. Each rdd entry will have multiple features, belonging to a single label.

  2. Send each rdd partition to the service function.

My experiments until now:

  1. When doing sdf.repartition('label'), it creates several empty dataframes.

  2. sdf.partionBy('label') also does not work. It creates a random number of partitions.

I have spent almost two days but of no concrete results until now. Any help or guidance in the right direction would be helpful.

Upvotes: 3

Views: 3540

Answers (1)

Sathiyan S
Sathiyan S

Reputation: 1023

You can use partitionBy with new HashPartitioner(number_of_partitions)

One extra action required to count the unique labels count and you can use that as number of required partitions.

Here is the sample, Note: You need a paired RDD to do this. So, after repartition you can map to get the necessary times from a tuple

scala> val data = sc.parallelize(List("1","1","1","2","3","4","4","4"),4)
scala> data.glom.collect
res20: Array[Array[String]] = Array(Array(1, 1), Array(1, 2), Array(3, 4), Array(4, 4))
scala> val data_repart = data.keyBy(x=>x).partitionBy(new HashPartitioner(data.distinct.count.toInt))
scala> data_repart.glom.collect
res21: Array[Array[(String, String)]] = Array(Array((4,4), (4,4), (4,4)), Array((1,1), (1,1), (1,1)), Array((2,2)), Array((3,3)))
scala> data_repart.map(_._2).glom.collect
res22: Array[Array[String]] = Array(Array(4, 4, 4), Array(1, 1, 1), Array(2), Array(3))

Let me know if it helps.

Upvotes: 1

Related Questions