Reputation: 99
I have this dataset (I'm putting some a few rows):
11.97,1355,401
3.49,25579,12908
9.29,129186,10882
28.73,10153,22356
3.69,22872,9798
13.49,160371,2911
24.36,106764,867
3.99,163670,16397
19.64,132547,401
And I'm trying to assign all this rows to 4 clusters using K-Means. For that I'm using the code that I see in this post: Spark MLLib Kmeans from dataframe, and back again
val data = sc.textFile("/user/cloudera/TESTE1")
val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
val clusters = KMeans.train(idPointRDD.map(_._2), 4, 20)
val clustersRDD = clusters.predict(idPointRDD.map(_._2))
val idClusterRDD = idPointRDD.map(_._1).zip(clustersRDD)
val idCluster = idClusterRDD.toDF("purchase","id","product","cluster")
I'm getting this outputs:
scala> import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
scala> import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.Vectors
scala> val data = sc.textFile("/user/cloudera/TESTE")
data: org.apache.spark.rdd.RDD[String] = /user/cloudera/TESTE MapPartitionsRDD[7] at textFile at <console>:29
scala> val idPointRDD = data.map(s => (s(0), Vectors.dense(s(1).toInt,s(2).toInt))).cache()
idPointRDD: org.apache.spark.rdd.RDD[(Char, org.apache.spark.mllib.linalg.Vector)] = MapPartitionsRDD[8] at map at <console>:31
But when I run it I'm getting the following error:
java.lang.UnsupportedOperationException: Schema for type Char is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:715)
How can I solve this problem?
Many thanks!
Upvotes: 0
Views: 547
Reputation: 40380
Here is the thing. You are actually reading a CSV of values into an RDD of String and not converting it properly to numeric values. Instead since a string is a collection of character when you call upon s(0) per example this actually works converts the Char
value to an integer or a double but it's not what you are actually looking for.
You need to split your val data : RDD[String]
val data : RDD[String] = ???
val idPointRDD = data.map {
s =>
s.split(",") match {
case Array(x,y,z) => Vectors.dense(x.toDouble, Integer.parseInt(y).toDouble,Integer.parseInt(z).toDouble)
}
}.cache()
This should work for you !
Upvotes: 1