Reputation: 1175
i want to join two list (NoHeaderIndexed and NoFirstIndexed) by the key likue this :
final Broadcast<JavaPairRDD<Long, Tuple2<String, String>>> c = ctx.broadcast(noHeaderIndexed);
JavaPairRDD<Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>> rs = noFirstIndexed.mapToPair(new PairFunction<Tuple2<Long, Tuple2<String, String>>, Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>>() {
@Override
public Tuple2<Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>> call(Tuple2<Long, Tuple2<String, String>> longTuple2Tuple2) throws Exception {
String s1 = "";
if (c.value().lookup(longTuple2Tuple2._1).get(0)._1 != null)
s1 = c.value().lookup(longTuple2Tuple2._1).get(0)._1;
String s2 = "";
if (c.value().lookup(longTuple2Tuple2._1).get(0)._2 != null)
s2 = c.value().lookup(longTuple2Tuple2._1).get(0)._2;
return new Tuple2<Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>>(new Tuple2<Tuple2<String, String>, Long>(new Tuple2<String, String>(longTuple2Tuple2._2._1,longTuple2Tuple2._2._2),longTuple2Tuple2._1),new Tuple2<Tuple2<String, String>, Long>(new Tuple2<String, String>(s1,s2),longTuple2Tuple2._1 ) );
}
});
//writeResult(rs, "rs.txt");
rs.coalesce(1,true).saveAsTextFile(path+ "rs");
But when i try to execute it, it display this :
INFO ShuffleMemoryManager: Thread 61 waiting for at least 1/2N of shuffle memory pool to be free
And it doesn't terminate the execution. Could you please explain to me the problem and how can i fix it.
Thank you in advance.
Upvotes: 1
Views: 831
Reputation: 79
Here in this command
rs.coalesce(1,true).saveAsTextFile(path+ "rs");
you are making only one partition so that all data will come to one node. you need increase the number of partitions
Try this depends on your data size
rs.coalesce(10,true).saveAsTextFile(path+ "rs");
Upvotes: 1