Reputation: 103
I'm trying to implement k-means method using scala. I created a RDD something like that
val df = sc.parallelize(data).groupByKey().collect().map((chunk)=> {
sc.parallelize(chunk._2.toSeq).toDF()
})
val examples = df.map(dataframe =>{
dataframe.selectExpr(
"avg(time) as avg_time",
"variance(size) as var_size",
"variance(time) as var_time",
"count(size) as examples"
).rdd
})
val rdd_final=examples.reduce(_ union _)
val kmeans= new KMeans()
val model = kmeans.run(rdd_final)
With this code I obtain an error
type mismatch;
[error] found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
[error] required:org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
So I tried to cast doing:
val rdd_final_Vector = rdd_final.map{x:Row => x.getAs[org.apache.spark.mllib.linalg.Vector](0)}
val model = kmeans.run(rdd_final_Vector)
But then I obtain an error:
java.lang.ClassCastException: java.lang.Double cannot be cast to org.apache.spark.mllib.linalg.Vector
So I'm looking for a way to do that cast, but I can't find any method.
Any idea?
Best regards
Upvotes: 0
Views: 4792
Reputation: 63032
At least a couple of issues here:
Spark SQL
. A Vector
is not a native spark sql typeKMeans
: the SQL is performing aggregations. But KMeans
expects a series of individual data points in the form a Vector (which encapsulates an Array[Double]
) . So then - why are you supplying sum
's and average
's to a KMeans
operation?Addressing just #1 here: you will need to do something along the lines of:
val doubVals = <rows rdd>.map{ row => row.getDouble("colname") }
val vector = Vectors.toDense{ doubVals.collect}
Then you have a properly encapsulated Array[Double]
(within a Vector) that can be supplied to Kmeans
.
Upvotes: 2