Mjas
Mjas

Reputation: 129

Need some inputs in feature extraction in Apache Spark

I am new to Apache Spark and we are trying to use the MLIB utility to do some analysis. I collated some code to convert my data into features and then apply a linear regression algorithm to that. I am facing some issues . Please help and excuse if its a silly question

My person data looks like

1,1000.00,36 2,2000.00,35 3,2345.50,37 4,3323.00,45

Just a simple example to get the code working

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint

case class Person(rating: String, income: Double, age: Int)
val persondata = sc.textFile("D:/spark/mydata/persondata.txt").map(_.split(",")).map(p => Person(p(0), p(1).toDouble, p(2).toInt))

def prepareFeatures(people: Seq[Person]): Seq[org.apache.spark.mllib.linalg.Vector] = {
  val maxIncome = people.map(_ income) max
  val maxAge = people.map(_ age) max

  people.map (p =>
    Vectors.dense(
      if (p.rating == "A") 0.7 else if (p.rating == "B") 0.5 else 0.3,
      p.income / maxIncome,
      p.age.toDouble / maxAge))
}


def prepareFeaturesWithLabels(features: Seq[org.apache.spark.mllib.linalg.Vector]): Seq[LabeledPoint] =
  (0d to 1 by (1d / features.length)) zip(features) map(l => LabeledPoint(l._1, l._2))

---Its working till here.
---It breaks in the below code

val data = sc.parallelize(prepareFeaturesWithLabels(prepareFeatures(people))

scala> val data = sc.parallelize(prepareFeaturesWithLabels(prepareFeatures(people)))
<console>:36: error: not found: value people
Error occurred in an application involving default arguments.
       val data = sc.parallelize(prepareFeaturesWithLabels(prepareFeatures(people)))
                                                                           ^

Please advise

Upvotes: 1

Views: 348

Answers (1)

Holden
Holden

Reputation: 7442

You seem to be going in roughly the right direction but there are a few minor problems. First off you are trying to reference a value (people) that you haven't defined. More generally you seem to be writing your code to work with sequences, and instead you should modify your code to work with RDDs (or DataFrames). Also you seem to be using parallelize to try and parallelize your operation, but parallelize is a helper method to take a local collection and make it available as a distributed RDD. I'd probably recommend looking at the programming guides or some additional documentation to get a better understanding of the Spark APIs. Best of luck with your adventures with Spark.

Upvotes: 1

Related Questions