Edamame
Edamame

Reputation: 25396

PySpark: append/merge PythonRDD to a PySpark dataframe

I am using the following code to create a clustering model, then classify each record to certain cluster:

from pyspark.mllib.clustering import KMeans
from pyspark.mllib.linalg import Vectors

spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10,  initializationMode="random")

result = model.predict(red)

How do I append the predicted result back to the spark_df as an additional column? Thanks!

Upvotes: 2

Views: 1178

Answers (1)

zero323
zero323

Reputation: 330383

pyspark.mllib.clustering.KMeansModel is one of rare models that can be used directly inside PySpark transformation so you can simply map with predict:

rdd.map(lambda point: (model.predict(point), point))

In general case when it is not possible zip is the right tool for the job:

rdd.zip(model.predict(rdd))

Upvotes: 1

Related Questions