Reputation: 163
I have two RDD's of the form:
data_wo_header: RDD[String], data_test_wo_header: RDD[String]
scala> data_wo_header.first
res2: String = 1,2,3.5,1112486027
scala> data_test_wo_header.first
res2: String = 1,2
RDD2 is smaller than RDD 1. I am trying to filter RDD1 by removing the elements whose regEx matches with RDD2.
The 1,2 in the above example represent UserID,MovID. Since it's present in the test I want the new RDD such that it's removed from RDD1.
I have asked a similar ques but it is requiring to do unnecessary split of RDD. I am trying to do something of this sort but it's not working:
def create_training(data_wo_header: RDD[String], data_test_wo_header: RDD[String]): List[String] = {
var ratings_train = new ListBuffer[String]()
data_wo_header.foreach(x => {
data_test_wo_header.foreach(y => {
if (x.indexOf(y) == 0) {
ratings_train += x
}
})
})
val ratings_train_list = ratings_train.toList
return ratings_train_list
}
Upvotes: 2
Views: 791
Reputation: 2603
You can use broadcast variable to share state of rdd2 and then filter rdd1 based on broadcasted variable of rdd2. I replicate your code and this works for me
def create_training(data_wo_header: RDD[String], data_test_wo_header: RDD[String]): List[String] = {
val rdd2array = sparkSession.sparkContext.broadcast(data_test_wo_header.collect())
val training_set = data_wo_header.filter{
case(x) => rdd2array.value.filter(y => x.matches(y)).length == 0
}
training_set.collect().toList
}
Also with scala and spark I recommend you if it is possible to avoid foreach and use more functional paradigm with map,flatMap and filter functions
Upvotes: 2