Bill
Bill

Reputation: 363

spark shuffle write is super slow

why is the spark shuffle stage is so slow for 1.6MB shuffle write, and 2.4MB input?.Also why is the shuffle write happening only on one executor ?.I am running a 3 node cluster with 8 cores each.

Spark UI:

enter image description here enter image description here Code:

*JavaPairRDD<String, String> javaPairRDD = c.mapToPair(new PairFunction<String, String, String>() {
    @Override
    public Tuple2<String, String> call(String arg0) throws Exception {
        // TODO Auto-generated method stub

        try {
            if (org.apache.commons.lang.StringUtils.isEmpty(arg0)) {
                return new Tuple2<String, String>("", "");
            }
            Tuple2<String, String> t = new Tuple2<String, String>(getESIndexName(arg0), arg0);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("******* exception in getESIndexName");
        }
        return new Tuple2<String, String>("", "");
    }
});

java.util.Map<String, Iterable<String>> map1 = javaPairRDD.groupByKey().collectAsMap();* 

Upvotes: 3

Views: 7400

Answers (1)

vaquar khan
vaquar khan

Reputation: 11479

Why is the shuffle write happening only on one executor:

Please check your RDD partitions, following UI image help you to find

enter image description here

I think your RDD has only one partition, instead of 8, or more, that would eventually utilize all the executors.

rdd = rdd.repartition(8) 

Avoiding Shuffle "Less stage, run faster

Shuffling is a process of redistributing data across partitions (aka repartitioning) that may or may not cause moving data across JVM processes or even over the wire (between executors on separate machines).

By default, shuffling doesn’t change the number of partitions, since you have only one partition it's looking slow.

How to avoid shuffle :

  • When both RDDs have duplicate keys, the join can cause the size of the data to expand dramatically. It may be better to perform a distinct or combineByKey operation to reduce the key space or to use cogroup to handle duplicate keys instead of producing the full cross product. By using smart partitioning during the combine step, it is possible to prevent a second shuffle in the join (we will discuss this in detail later).

  • If keys are not present in both RDDs you risk losing your data unexpectedly. It can be safer to use an outer join, so that you are guaranteed to keep all the data in either the left or the right RDD, then filter the data after the join.

  • If one RDD has some easy-to-define subset of the keys, in the other you may be better off filtering or reducing before the join to avoid a big shuffle of data, which you will ultimately throw away anyway.

  • In order to join data, Spark needs the data that is to be joined (i.e., the data based on each key) to live on the same partition. The default implementation of a join in Spark is a shuffled hash join. The shuffled hash join ensures that data on each partition will contain the same keys by partitioning the second dataset with the same default partitioner as the first, so that the keys with the same hash value from both datasets are in the same partition. While this approach always works, it can be more expensive than necessary because it requires a shuffle. The shuffle can be avoided if:

    1.Both RDDs have a known partitioner.

    1. One of the datasets is small enough to fit in memory, in which case we can do a broadcast hash join (we will explain what this is later).

Note that if the RDDs are colocated the network transfer can be avoided, along with the shuffle. Always persist after repartitioning

  • DataFrame Joins Joining data between DataFrames is one of the most common multi-DataFrame transformations. The standard SQL join types are all supported and can be specified as the joinType in df.join(otherDf, sqlCondition, joinType) when performing a join. As with joins between RDDs, joining with nonunique keys will result in the cross product (so if the left table has R1 and R2 with key1 and the right table has R3 and R5 with key1 you will get (R1, R3), (R1, R5), (R2, R3), (R2, R5)) in the output.

  • Using a self join and a lit(true), you can produce the cartesian product of your Dataset, which can be useful but also illustrates how joins (especially self joins) can easily result in unworkable data sizes.

  • Use a broadcast join with broadcast join, you can very effectively join a large table (fact) with relatively small tables (dimensions) by avoiding sending all data of the large table over the network.You can use broadcast function to mark a dataset to be broadcast when used in a join operator. It uses spark.sql.autoBroadcastJoinThreshold setting to control the size of a table that will be broadcast to all worker nodes when performing a join.

  • Use the same partitioner. If two RDDs have the same partitioner, the join will not cause a shuffle. Note however, that the lack of a shuffle does not mean that no data will have to be moved between nodes. It's possible for two RDDs to have the same partitioner (be co-partitioned) yet have the corresponding partitions located on different nodes (not be co-located). This situation is still better than doing a shuffle, but it's something to keep in mind. Co-location can improve performance, but is hard to guarantee.

  • If the data is huge and/or your clusters cannot grow such that even leads to OOM, use a two-pass approach. First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()). Then, join sub-partitions serially in a loop, "appending" to the same final result table.

  • https://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

  • https://medium.com/@foundev/you-won-t-believe-how-spark-shuffling-will-probably-bite-you-also-windowing-e39d07bf754e
  • https://blog.cloudera.com/blog/2015/01/improving-sort-performance-in-apache-spark-its-a-double/ -https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-rdd-shuffle.html

Upvotes: 4

Related Questions