ar_mm18
ar_mm18

Reputation: 465

K means Clustering using PySpark

I have a very big data frame, I have to use specific 8 columns from the data frame where the values are either "strongly agree", "agree" or "disagree". Depending on these 8 columns, I need to create a new column which tell which cluster(1-8) the row belongs to (preferably by using K-means clustering).

Is this possible in PySpak?

PS: I am new to PySpark, Any help would be highly appreciated

column:1     Column:2    column:3   column:4    column:5 .........column:8    new_column_required?
----------   --------    --------   --------    --------          ---------   --------------------
agree        disagree    agree      agree       disagree          disagree    cluster1?

disagree     NaN         disagree    disagree   NaN                agree       NaN?       
.            .           .           .          .                 .           .
.            .           .           .          .                 .           .

agree        disagree    agree       agree     disagree           agree       cluster 7?

Upvotes: 2

Views: 878

Answers (2)

werner
werner

Reputation: 14845

Step 1: Generate Test data

Create some (almost) random test data.

cols=[f'col{i}' for i in range(1,9)]
rows=100

def create_data():
  from random import random
  for i in range(0,rows):
    yield ['agree' if random() < i/rows else 'disagree' if random() < 0.95 else None for c in cols]

df=spark.createDataFrame(list(create_data()), cols)

Step 2: Transforms Strings

The agree/disagree strings cannot by handled by the VectorAssembler in the step 3. So the strings are transformed into numeric values. Here we treat the Null/NaN values as third category.

boolean_cols=[f'{c}_bool' for c in cols]
df2 = df.selectExpr(cols + [f'if( {c} = "agree", 1.0, if( {c} = "disagree", 2.0, 3.0)) as {b}' for c, b in zip(cols,boolean_cols)])

Using a StringIndexer would also be an option. But as there are only two different strings this might be a bit overengineered.

Step 3: Create a Feature Column

PySpark's K-Means implementation expects the features in a single vector column. Use a VectorAssembler for this task.

from pyspark.ml.feature import VectorAssembler
df3 = VectorAssembler(inputCols=boolean_cols, outputCol="features").transform(df2)

Step 4: Finally run the Clustering Algorithm

from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=8).setSeed(1)
kmeans.setMaxIter(10)
model = kmeans.fit(df3)
predictions = model.transform(df3)

After removing the intermediate columns from the output we get

predictions.select(cols + ['prediction']).show()
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
|    col1|    col2|    col3|    col4|    col5|    col6|    col7|    col8|prediction|
+--------+--------+--------+--------+--------+--------+--------+--------+----------+
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
[...]
|disagree|   agree|disagree|   agree|   agree|disagree|disagree|disagree|         3|
|disagree|disagree|disagree|disagree|disagree|disagree|disagree|disagree|         1|
|disagree|disagree|disagree|disagree|disagree|disagree|   agree|disagree|         5|
|disagree|   agree|   agree|   agree|disagree|disagree|disagree|   agree|         3|
|   agree|   agree|   agree|disagree|disagree|   agree|disagree|disagree|         6|
[...]
|   agree|   agree|   agree|   agree|   agree|   agree|   agree|   agree|         7|
|   agree|   agree|   agree|   agree|   agree|disagree|   agree|   agree|         2|
|   agree|   agree|   agree|   agree|   agree|   agree|   agree|   agree|         7|
+--------+--------+--------+--------+--------+--------+--------+--------+----------+

Upvotes: 2

ASH
ASH

Reputation: 20302

It should be something like this.

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)

More info here.

https://spark.apache.org/docs/latest/ml-clustering.html

Here is the parent link.

https://spark.apache.org/docs/latest/ml-classification-regression.html

Upvotes: 1

Related Questions