Reputation: 78
I've been trying to extract information out of a 1-column Spark Dataframe consisting of Doubles and put it into a Breeze SparseVector. To do this, I go through every element of my 1-column DataFrame, force it to be a Double, then add it to the VectorBuilder. My VectorBuilder correctly mutates its state in the foreach loop then all changes are cleared after the loop ends. Why does this happen? Is there a workaround?
EDIT 1:
I'm running this locally with 1 core; it's not on a cluster
Code:
val accum = sc.accumulator(0, "Counter")
def correlate() : Unit = {
val cols = df.columns
val id = cols(0)
val id2 = cols(1)
//id1 and id2 are there for
val df1 = sqlContext.sql(s"SELECT ${id} FROM dataset WHERE (${id} IS NOT NULL AND ${id2} IS NOT NULL)")
/* df1 is a dataframe that has 1 column*/
df1.show();
accum.value_=(0);
/******************** Problem starts here **********************/
val builder = new VectorBuilder[Double](5)
df1.foreach{ x =>
x(0) match{
case d : Double =>
builder.add(accum.value, d);
//This print statement prints out correct values
println(s"index: ${accum.value} value: ${builder(accum.value)}")
accum.value += 1;
println(s"builder's active size in loop: ${builder.activeSize}")
case _ => throw new ClassCastException("Pattern-Matching for Double failed");
}
}
//temp becomes empty at this point
println(s"builder's active size out of loop: ${builder.activeSize}")
val sparse = builder.toSparseVector
sparse.foreachPair{(i,v) => println(s"index: ${i} and value: ${v}")}
}
this.correlate()
Output:
+-------+
| RowX|
+-------+
| 145.0|
| -1.0|
|-212.21|
| 23.3|
| 21.4|
+-------+
index: 0 value: 145.0
builder's active size in loop: 1
index: 1 value: -1.0
builder's active size in loop: 2
index: 2 value: -212.21
builder's active size in loop: 3
index: 3 value: 23.3
builder's active size in loop: 4
index: 4 value: 21.4
builder's active size in loop: 5
//the loop ends here and builder's state disappears
builder's active size out of loop: 0
index: 0 and value: 0.0
index: 1 and value: 0.0
index: 2 and value: 0.0
index: 3 and value: 0.0
index: 4 and value: 0.0
Upvotes: 1
Views: 202
Reputation:
It adds to local copy of builder for every worker. To get local object collect:
SparseVector(df1.rdd.map(_.getDouble(0)).collect)
Upvotes: 1