Amine CHERIFI
Amine CHERIFI

Reputation: 1175

shuffle memory pool to be free : SPARK with Java

i want to join two list (NoHeaderIndexed and NoFirstIndexed) by the key likue this :

    final  Broadcast<JavaPairRDD<Long, Tuple2<String, String>>> c = ctx.broadcast(noHeaderIndexed);
    JavaPairRDD<Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>> rs = noFirstIndexed.mapToPair(new PairFunction<Tuple2<Long, Tuple2<String, String>>, Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>>() {
        @Override
        public Tuple2<Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>> call(Tuple2<Long, Tuple2<String, String>> longTuple2Tuple2) throws Exception {
            String s1 = "";
            if (c.value().lookup(longTuple2Tuple2._1).get(0)._1 != null)
                s1 = c.value().lookup(longTuple2Tuple2._1).get(0)._1;

            String s2 = "";
            if (c.value().lookup(longTuple2Tuple2._1).get(0)._2 != null)
                s2 = c.value().lookup(longTuple2Tuple2._1).get(0)._2;
            return new Tuple2<Tuple2<Tuple2<String, String>, Long>, Tuple2<Tuple2<String, String>, Long>>(new Tuple2<Tuple2<String, String>, Long>(new Tuple2<String, String>(longTuple2Tuple2._2._1,longTuple2Tuple2._2._2),longTuple2Tuple2._1),new Tuple2<Tuple2<String, String>, Long>(new Tuple2<String, String>(s1,s2),longTuple2Tuple2._1 ) );
        }
    });

    //writeResult(rs, "rs.txt");
    rs.coalesce(1,true).saveAsTextFile(path+ "rs");    

But when i try to execute it, it display this :

INFO ShuffleMemoryManager: Thread 61 waiting for at least 1/2N of shuffle memory pool to be free    

And it doesn't terminate the execution. Could you please explain to me the problem and how can i fix it.

Thank you in advance.

Upvotes: 1

Views: 831

Answers (1)

user1989252
user1989252

Reputation: 79

Here in this command

rs.coalesce(1,true).saveAsTextFile(path+ "rs");

you are making only one partition so that all data will come to one node. you need increase the number of partitions

Try this depends on your data size

rs.coalesce(10,true).saveAsTextFile(path+ "rs");

Upvotes: 1

Related Questions