Reputation: 659
because the MLlib not support the sparse input. So I run the flowing code, which support the sparse input format, on spark clusters. And the setting is:
The code is:
import java.util.Random
import scala.collection.mutable.HashMap
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Vector
import java.lang.Math
import org.apache.spark.broadcast.Broadcast
object SparseLR {
val lableNum = 1
val dimNum = 632918
val iteration = 10
val alpha = 0.1
val lambda = 0.1
val rand = new Random(42)
var w = Vector(dimNum, _=> rand.nextDouble)
class SparserVector {
var elements = new HashMap[Int, Double]
def insert(index: Int, value: Double){
elements += index -> value;
}
def *(scale: Double): Vector = {
var x = new Array[Double](dimNum)
elements.keySet.foreach(k => x(k) = scale * elements.get(k).get)
Vector(x)
}
}
case class DataPoint(x: SparserVector, y: Int)
def parsePoint(line: String): DataPoint = {
var features = new SparserVector
val fields = line.split("\t")
//println("fields:" + fields(0))
val y = fields(0).toInt
fields.filter(_.contains(":")).foreach( f => {
val feature = f.split(":")
features.insert(feature(0).toInt, feature(1).toDouble)
})
return DataPoint(features, y)
}
def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {
def h(w: Broadcast[Vector], x: SparserVector): Double = {
val wb = w.value
val features = x.elements
val s = features.keySet.map(k => features.get(k).get * wb(k)).reduce(_ + _)
1 / (1 + Math.exp(-p.y * s))
}
p.x * (-(1 - p.y *h(w, p.x)))
}
def train(sc: SparkContext, dataPoints: RDD[DataPoint]) {
//val sampleNum = dataPoints.count
val sampleNum = 11680250
for(i <- 0 until iteration) {
val wb = sc.broadcast(w)
val g = (dataPoints.map(p => gradient(p, wb)).reduce(_ + _) + lambda * wb.value) /sampleNum
w -= alpha * g
println("iteration " + i + ": g = " + g)
}
}
def main(args : Array[String]): Unit = {
System.setProperty("spark.executor.memory", "15g")
System.setProperty("spark.default.parallelism", "32");
val sc = new SparkContext("spark://xxx:12036", "LR", "/xxx/spark", List("xxx_2.9.3-1.0.jar"))
val lines = sc.textFile("hdfs:xxx/xxx.txt", 32)
val trainset = lines.map(parsePoint _).cache()
train(sc, trainset)
}
}
Can anyone help me? Thanks!
Upvotes: 4
Views: 1243
Reputation: 12565
It is really hard to give you an answer for this. Maybe this would be a better match for the code review stackoverflow subsite?
Some things that are immediately obvious:
Your gradient function seems inefficient. When you want to do something for each key/value pair of a map, it is much more efficient to do
for((k,v)<-map) {
...
}
than to do
for(k<-map.keySet) { val value = map.get(k).get;
...
}
Also, for performance critical code like this it might be preferable to change the reduce to accumulating a mutable value. So the rewritten gradient function would be
def gradient(p: DataPoint, w: Broadcast[Vector]) : Vector = {
def h(w: Broadcast[Vector], x: SparserVector): Double = {
val wb = w.value
val features = x.elements
var s = 0.0
for((k,v)<-features)
s += v * wb(k)
1 / (1 + Math.exp(-p.y * s))
}
p.x * (-(1 - p.y *h(w, p.x)))
}
Now if you want to increase the performance even more, you will have to change the SparseVector to use an array of indices and an array of values instead of a Map[Int, Double]. The reason for this is that in a Map the keys and values will be boxed as objects with considerable overhead, while an Array[Int] or Array[Double] is just a single compact chunk of memory
(For convenience it might be advisable to define a builder that uses a SortedMap[Int, Double] and converts into two arrays when finished building)
class SparseVector(val indices: Array[Int], val values: Array[Double]) {
require(indices.length == values.length)
def *(scale: Double): Vector = {
var x = new Array[Double](dimNum)
var i = 0
while(i < indices.length) {
x(indices(i)) = scale * values(i)
i += 1
}
Vector(x)
}
}
Note that the code examples above are not tested, but I guess you will get the idea.
Upvotes: 4