Reputation: 769
I am trying to implement a self learning approach for training a classifier. I am using spark 1.6.0. The problem is when I map a RDD to another I get wrong counts. The same code works OK for small datasets by on larder dataset it just goes nuts.
println("INITIAL TRAINING SET SIZE : " + trainingSetInitial.count())
for(counter <- 1 to 10){
println("------------------- This is the_" + counter + " run -----------------")
println("TESTING SET SIZE : " + testing.count())
val lowProbabilitiesSet = testing.flatMap { item =>
if (model.predictProbabilities(item._2)(0) <= 0.75 && model.predictProbabilities(item._2)(1) <= 0.75) {
List(item._1)
} else {
None
}}.cache()
val highProbabilitiesSet = testing.flatMap { item =>
if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75 ) {
List(item._1 +","+ model.predict(item._2).toDouble )
} else {
None
}}.cache()
println("LOW PROBAB SET : " + lowProbabilitiesSet.count())
println("HIGH PROBAB SET : " + highProbabilitiesSet.count())
trainingSetInitial = trainingSetInitial.union(highProbabilitiesSet.map(x => LabeledPoint(List(x)(0).split(",")(8).toString.toDouble, htf.transform(List(x)(0).toString.split(",")(7).split(" ") ))))
model = NaiveBayes.train(trainingSetInitial, lambda = 1.0)
println("NEW TRAINING SET : " + trainingSetInitial.count())
previousCount = lowProbabilitiesSet.count()
testing = lowProbabilitiesSet.map { line =>
val parts = line.split(',')
val text = parts(7).split(' ')
(line, htf.transform(text))
}
testing.checkpoint()
}
This is the log from the correct output:
INITIAL TRAINING SET SIZE : 238.182
------------------- This is the_1 run -----------------
TESTING SET SIZE : 3.158.722
LOW PROBAB SET : 22.996
HIGH PROBAB SET : 3.135.726
NEW TRAINING SET : 3373908
------------------- This is the_2 run -----------------
TESTING SET SIZE : 22996
LOW PROBAB SET : 566
HIGH PROBAB SET : 22430
NEW TRAINING SET : 3396338
And here is when the problem begins (large dataset input):
INITIAL TRAINING SET SIZE : 31.990.660
------------------- This is the_1 run -----------------
TESTING SET SIZE : 423.173.780
LOW PROBAB SET : 62.615.460
HIGH PROBAB SET : 360.558.320
NEW TRAINING SET : 395265857
------------------- This is the_2 run -----------------
TESTING SET SIZE : 52673986
LOW PROBAB SET : 51460875
HIGH PROBAB SET : 1213111
NEW TRAINING SET : 401950263
The 'LOW PROBAB SET' on the first iteration should be the 'TESTING SET' for the second iteration. Somewhere, somehow 10 million entries disappear. Also the 'NEW TRAINING SET' on the 1st iteration should be the concatenation of the 'INITIAL TRAINING' and the 'HIGH PROB SET'. Again the numbers don't match.
I did not get any errors while the code was running. I tried to cache each set and unpersist at the end of each iteration (HIGH and LOW sets only) but same results. I also tried to checkpoint the sets, didn't work. Why is this happening?
EDIT
Just for testing I did not create a new model inside the loop just to see what happens:
for(counter <- 1 to 5){
println("------------------- This is the_" + counter + " run !!! -----------------")
var updated_trainCnt = temp_train.count();
var updated_testCnt = test_set.count();
println("Updated Train SET SIZE: " + updated_trainCnt)
println("Updated Testing SET SIZE: " + updated_testCnt)
val highProbabilitiesSet = test_set.filter { item =>
val output = model.predictProbabilities(item._2)
output(0) > 0.75 || output(1) > 0.75
}.map(item => (item._1 + "," + model.predict(item._2), item._2 )).cache()
test_set = test_set.filter { item =>
val output = model.predictProbabilities(item._2)
output(0) <= 0.75 && output(1) <= 0.75
}.map(item => (item._1, item._2)).cache()
var hiCnt = highProbabilitiesSet.count()
var lowCnt = test_set.count()
println("HIGH PROBAB SET : " + hiCnt)
println("LOW PROBAB SET : " + lowCnt)
var diff = updated_testCnt - hiCnt - lowCnt
if (diff!=0) println("ERROR: Test set not correctly split into high low" + diff)
temp_train= temp_train.union(highProbabilitiesSet.map(x => LabeledPoint(x._1.toString.split(",")(8).toDouble, x._2 ))).cache()
println("NEW TRAINING SET: " + temp_train.count())
// model = NaiveBayes.train(temp_train, lambda = 1.0, modelType = "multinomial")
println("HIGH PROBAB SET : " + highProbabilitiesSet.count())
println("LOW PROBAB SET : " + test_set.count())
println("NEW TRAINING SET: " + temp_train.count())
}
The produced numbers, from the original model were OK even the union of RDDs was performed without an issue. But the big question remains, how does the classification model mess the training set(lowProbabilititesSet) without even modifying it at the end of each loop (or the other RDDs)?
Console logs and spark logs do not show any error or executioner crush. How does the classification training process corrupt my data ?
Upvotes: 2
Views: 356
Reputation: 769
Even though I still haven't figured out why is this happening as a hack I flushed the RDDs to the HDFS and made a bash script which runs the class iteratively and reads the data from HDFS every time. As I figured out the problem appears when I train the classifier inside the loop.
Upvotes: 0
Reputation: 2250
I don't see the problem right away. Please minimise the code to the actual problem. First thing I would suggest to rewrite the flatMap
operations to a filter
:
val highProbabilitiesSet = testing.flatMap { item =>
if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75 ) {
List(item._1 +","+ model.predict(item._2).toDouble )
} else {
None
}
}.cache()
To:
val highProbabilitiesSet = testing.filter { item =>
val output = model.predictProbabilities(item._2)
output(0) > 0.75 || output(1) > 0.75
}.map(item => (item._1, model.predict(item._2).toDouble)).cache()
Upvotes: -2