jUsr
jUsr

Reputation: 311

What affects amount of data shuffled in spark

For example Im executing some queries on spark, and in the spark UI I can see that some queries have more shuffle , and this shuffle seems that is the amount of data read locally and read between executors.

But so Im not understanding one thing, for example this query below loaded 7GB from HDFS but the suffle read + shuffled write is more than 10GB. But I saw other queries that load also 7GB from HDFS and the shuffle is like 500kb. So Im not understanding this, can you please help? The amount of data shuffled is not related in the data read from hdfs?

select 
  nation, o_year, sum(amount) as sum_profit
from 
  (
select 
  n_name as nation, year(o_orderdate) as o_year, 
  l_extendedprice * (1 - l_discount) -  ps_supplycost * l_quantity as amount
    from
      orders o join
      (select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost 
       from part p join
         (select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey, 
                 n_name, ps_supplycost 
          from partsupp ps join
            (select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey, 
                    l_orderkey, n_name 
             from
               (select s_suppkey, n_name 
                from nation n join supplier s on n.n_nationkey = s.s_nationkey
               ) s1 join lineitem l on s1.s_suppkey = l.l_suppkey
            ) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey
         ) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey
     ) l3 on o.o_orderkey = l3.l_orderkey
  )profit
group by nation, o_year
order by nation, o_year desc;

Upvotes: 2

Views: 581

Answers (2)

Preeti Khurana
Preeti Khurana

Reputation: 370

The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines. So its pretty clear here that shuffled data is not really dependent on the amount of the input data. However, it depends upon what operations you perform on the input data, which leads to the movement of data across executors( and hence machines). Please go through http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations to know and understand why shuffling is a costly process.

Looking at the query you have pasted, it seems you are doing a lot of join operations (haven't looked deep to understand the ultimate operation you are doing). And that definitely calls for moving the data across partitions. The problem can be handled by revisiting the query and optimizing the same or manipulating or pre-procesing your input data in a manner which leads to less movement of data ( For ex: colocating the data which has be joined so that they fall in same partition). Again, this is just an example and you have to determine from your use case on what works best for you.

Upvotes: 2

samthebest
samthebest

Reputation: 31515

I highly recommend reading what I consider to be the paper on explaining the Mapreduce programming model.

Basically no it isn't the amount of data on HDFS (or whatever the source) that determines how much data is shuffled. I'll try to explain using three examples:

Example 1. Amount of data shuffled is less than input data:

val wordCounts = words.map((_, 1)).reduceByKey(_ + _)

Here we count the number of words (for each key) in each partition, then only shuffle the result. Then once we have shuffled the sub-counts, we add them up. So the amount of data we shuffle, is related to the amount of counts. So in this case it's related to the number of unique words.

If we only had one unique word, we would shuffle a lot less data than the input. In fact about as much counts as there are threads (so a tiny amount).

Hypothetically if every word was unique, then we would shuffle more data (read the paper for details). So the amount of data shuffled in this example is related to how many unique keys we have (unique words).

Example 2. Amount of data shuffled is same as input data:

val wordCounts = words.map((_, 1)).groupByKey().mapValues(_.size)

Here, we group all the words together, then we count how many there are. So we will need to shuffle all the data around.

Example 3. Amount of data shuffled is more than input data:

val silly = 
  words.map(word => 
    (word, generateReallyLongString()))
  .groupByKey()

Here our map stage maps every word to a really long random string, then we group them all together by word. Here we are generating more data than the input, and will shuffle more data than the input.

Upvotes: 3

Related Questions