Reputation: 888
How do I convert csv to Rdd[Double]? I have the error: cannot be applied to (org.apache.spark.rdd.RDD[Unit]) at this line:
val kd = new KernelDensity().setSample(rows)
My full code is here:
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.stat.KernelDensity
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
class KdeAnalysis {
val conf = new SparkConf().setAppName("sample").setMaster("local")
val sc = new SparkContext(conf)
val DATAFILE: String = "C:\\Users\\ajohn\\Desktop\\spark_R\\data\\mass_cytometry\\mass.csv"
val rows = sc.textFile(DATAFILE).map {
line => val values = line.split(',').map(_.toDouble)
Vectors.dense(values)
}.cache()
// Construct the density estimator with the sample data and a standard deviation for the Gaussian
// kernels
val rdd : RDD[Double] = sc.parallelize(rows)
val kd = new KernelDensity().setSample(rdd)
.setBandwidth(3.0)
// Find density estimates for the given values
val densities = kd.estimate(Array(-1.0, 2.0, 5.0))
}
Upvotes: 0
Views: 2564
Reputation: 2072
Have prepared this code, please evaluate if it can help you out ->
val doubleRDD = rows.map(_.toArray).flatMap(x => x)
Upvotes: 0
Reputation: 330113
Since rows
is a RDD[org.apache.spark.mllib.linalg.Vector]
following line cannot work:
val rdd : RDD[Double] = sc.parallelize(rows)
parallelize
expects Seq[T]
and RDD
is not a Seq
.
Even if this part worked as you expect your input is simply wrong. A correct argument for KernelDensity.setSample
is either RDD[Double]
or JavaRDD[java.lang.Double]
. It looks like it doesn't support a multivariate data at this moment.
Regarding a question from the tile you can flatMap
rows.flatMap(_.toArray)
or even better when you create rows
val rows = sc.textFile(DATAFILE).flatMap(_.split(',').map(_.toDouble)).cache()
but I doubt it is really what you need.
Upvotes: 2