Spark: RDD missing entries in each iteration

I am trying to implement a self learning approach for training a classifier. I am using spark 1.6.0. The problem is when I map a RDD to another I get wrong counts. The same code works OK for small datasets by on larder dataset it just goes nuts.

println("INITIAL TRAINING SET SIZE : " + trainingSetInitial.count())
for(counter <- 1 to 10){
  println("-------------------  This is the_" + counter + " run -----------------")
  println("TESTING SET SIZE : "  + testing.count())

  val lowProbabilitiesSet = testing.flatMap { item =>
    if (model.predictProbabilities(item._2)(0) <= 0.75 && model.predictProbabilities(item._2)(1) <= 0.75) {
      List(item._1)
    } else {
      None
    }}.cache()
  val highProbabilitiesSet = testing.flatMap { item =>
    if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75 ) {
      List(item._1 +","+ model.predict(item._2).toDouble )
    } else {
      None
    }}.cache()
  println("LOW PROBAB SET : "  + lowProbabilitiesSet.count())
  println("HIGH PROBAB SET : "  + highProbabilitiesSet.count())

  trainingSetInitial = trainingSetInitial.union(highProbabilitiesSet.map(x => LabeledPoint(List(x)(0).split(",")(8).toString.toDouble, htf.transform(List(x)(0).toString.split(",")(7).split(" ") ))))
  model = NaiveBayes.train(trainingSetInitial, lambda = 1.0)
  println("NEW TRAINING SET : "  + trainingSetInitial.count())

  previousCount = lowProbabilitiesSet.count()
  testing = lowProbabilitiesSet.map { line =>
    val parts = line.split(',')
    val text = parts(7).split(' ')
    (line, htf.transform(text))
  }
  testing.checkpoint()
}

This is the log from the correct output:

INITIAL TRAINING SET SIZE : 238.182

------------------- This is the_1 run -----------------

TESTING SET SIZE : 3.158.722

LOW PROBAB SET : 22.996

HIGH PROBAB SET : 3.135.726

NEW TRAINING SET : 3373908

------------------- This is the_2 run -----------------

TESTING SET SIZE : 22996

LOW PROBAB SET : 566

HIGH PROBAB SET : 22430

NEW TRAINING SET : 3396338

And here is when the problem begins (large dataset input):

INITIAL TRAINING SET SIZE : 31.990.660

------------------- This is the_1 run -----------------

TESTING SET SIZE : 423.173.780

LOW PROBAB SET : 62.615.460

HIGH PROBAB SET : 360.558.320

NEW TRAINING SET : 395265857

------------------- This is the_2 run -----------------

TESTING SET SIZE : 52673986

LOW PROBAB SET : 51460875

HIGH PROBAB SET : 1213111

NEW TRAINING SET : 401950263

The 'LOW PROBAB SET' on the first iteration should be the 'TESTING SET' for the second iteration. Somewhere, somehow 10 million entries disappear. Also the 'NEW TRAINING SET' on the 1st iteration should be the concatenation of the 'INITIAL TRAINING' and the 'HIGH PROB SET'. Again the numbers don't match.

I did not get any errors while the code was running. I tried to cache each set and unpersist at the end of each iteration (HIGH and LOW sets only) but same results. I also tried to checkpoint the sets, didn't work. Why is this happening?

EDIT

Just for testing I did not create a new model inside the loop just to see what happens:

for(counter <- 1 to 5){
  println("-------------------  This is the_" + counter + " run !!! -----------------")
  var updated_trainCnt = temp_train.count();
  var updated_testCnt = test_set.count();
  println("Updated Train SET SIZE: "  + updated_trainCnt)
  println("Updated Testing SET SIZE: "  + updated_testCnt)

  val highProbabilitiesSet = test_set.filter { item =>
    val output = model.predictProbabilities(item._2)
    output(0) > 0.75 || output(1) > 0.75
  }.map(item => (item._1 + "," + model.predict(item._2), item._2 )).cache()

  test_set = test_set.filter { item =>
    val output = model.predictProbabilities(item._2)
    output(0) <= 0.75 && output(1) <= 0.75
  }.map(item => (item._1, item._2)).cache()
  var hiCnt = highProbabilitiesSet.count()
  var lowCnt = test_set.count()
  println("HIGH PROBAB SET : "  + hiCnt)
  println("LOW PROBAB SET  : "  +  lowCnt)
  var diff = updated_testCnt - hiCnt - lowCnt
  if (diff!=0) println("ERROR: Test set not correctly split into high low" + diff)
  temp_train= temp_train.union(highProbabilitiesSet.map(x => LabeledPoint(x._1.toString.split(",")(8).toDouble, x._2 ))).cache()
  println("NEW TRAINING SET: "  + temp_train.count())
//      model = NaiveBayes.train(temp_train, lambda = 1.0, modelType = "multinomial")
  println("HIGH PROBAB SET : "  + highProbabilitiesSet.count())
  println("LOW PROBAB SET  : "  + test_set.count())
  println("NEW TRAINING SET: "  + temp_train.count())
}

The produced numbers, from the original model were OK even the union of RDDs was performed without an issue. But the big question remains, how does the classification model mess the training set(lowProbabilititesSet) without even modifying it at the end of each loop (or the other RDDs)?

Console logs and spark logs do not show any error or executioner crush. How does the classification training process corrupt my data ?

Upvotes: 2

Views: 356

Answers (2)

Even though I still haven't figured out why is this happening as a hack I flushed the RDDs to the HDFS and made a bash script which runs the class iteratively and reads the data from HDFS every time. As I figured out the problem appears when I train the classifier inside the loop.

Upvotes: 0

Fokko Driesprong
Fokko Driesprong

Reputation: 2250

I don't see the problem right away. Please minimise the code to the actual problem. First thing I would suggest to rewrite the flatMap operations to a filter:

val highProbabilitiesSet = testing.flatMap { item =>
  if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75 ) {
      List(item._1 +","+ model.predict(item._2).toDouble )
  } else {
    None
  }
}.cache()

To:

val highProbabilitiesSet = testing.filter { item => 
  val output = model.predictProbabilities(item._2)
  output(0) > 0.75 || output(1) > 0.75
}.map(item => (item._1, model.predict(item._2).toDouble)).cache()

Upvotes: -2

Related Questions