Reputation: 51
i am trying to use filter() inside map() but i get this spark exception:
RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
i know that spark doesn't allow nested transformations/actions/RDDs, so can any one give me a suggestion how to do it alternatively (without nested transformations or actions), well i have an RDD its tuples are like:
JavaRDD< String[]> RDD
i try to map it, giving it a list as an argument, this list contains javaPairRDDs such:
List<JavaPairRDD<String,String>> list
JavaRDD< String[]> result = RDD.map(new modifyRDD(list));
these lines refers to modifyRDD() function:
public static class modifyRDD implements Function <String[], String[]> {
List<JavaPairRDD<String,String>> list;
public modifyRDD (List<JavaPairRDD<String,String>> list ){ this.list=list;}
public String [] call(String[] t) {
String[] s = t;
for (int i = 0; i < NB_TD; i++) {
int j=i;
// select the appropriate RDD from the RDDs_list to the current index
JavaPairRDD<String,String> rdd_i = list.get(i);
String previousElement=s[j];
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
String newElement=currentRDD.first()._2();
s[j]=newElement;
}
return (s) ;
}
}
so, the problem is in this line
JavaPairRDD<String,String> currentRDD = rdd_i.filter(line -> line._1().equals(previousElement));
now i give an example,suppose that list contains 2 PairRDDs
list={PairRDD1={(a,b)(c,d)},PairRDD2={(u,v)(x,y)}..}
and my RDD that i want to map contains:
JavaRDD< String[]> RDD = {[a,u],[c,x],[a,x].....}
the result that i want is after map():
JavaRDD< String[]> result = {[b,v],[d,y],[b,y].....}
Upvotes: 2
Views: 933
Reputation: 51
i changed the type of the list from List> to List>> list to avoid dealing with RDDs inside the map(), now i have no exception (offcourse bcz i have no nested transformations), but what i am not sure about it is if my new code is efficient, bcz the List> is large, and for searching an element i used a loop "for" (means i have to sweep the whole List> to get the element that i want) so i ask you as experts to give me remarks about it (using the loop for), and give suggestion to ameliorate it. thank you
this is the map() function after modification
public static class modifyRDD implements Function <String[], String[]> {
List<List<Tuple2<String,String>>> list;
public modifyRDD (List<List<Tuple2<String,String>>> list ){ this.list=list;}
public String [] call(String[] t) {
String[] s = t;
for (int i = 0; i < NB_TD; i++) {
// select the appropriate lookup_list
List<Tuple2<String,String>> list_i = list.get(i);
String previousElement=s[i];
String newElement="";
for (int k = 0; k < list_i.size(); k++){
Tuple2<String,String> sk1 = list_i.get(k);
if (sk1._1.equals( previousElement)){ newElement=sk1._2;}
}
s[i]= newElement;
}
return(s);
}
Upvotes: 0