svs teja
svs teja

Reputation: 1027

Reduce function in spark across partitions pyspark

I have written a sample Function using spark in python. The function is as follows

#!/usr/bin/env python
from __future__ import print_function
from pyspark.sql import SparkSession
import os
import sys
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.4"

spark = SparkSession \
    .builder \
    .appName("testpython") \
    .getOrCreate()
rdd1 = spark.sparkContext.parallelize([1,6,5,2,99,1000,100009,10000,139,44,45343,23234,34])
**rdd3=rdd1.reduce(lambda x,y:x+1)**
print(rdd3)

In reduce function we have given (lambda x,y:x+1) the ideal result must be 13 for the above function but result is coming as 6

Can anyone explain why the result is 6 instead of 13? Is it because of dvision of data across partitions in spark?

Console output:

/usr/bin/python3.4 /home/PycharmProjects/sampleproject/ttestexmple.py
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/07/20 17:45:14 WARN NativeCodeLoader: Unable to load native-hadoop 
17/07/20 17:45:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

6

Process finished with exit code 0

Upvotes: 1

Views: 4776

Answers (2)

user4601931
user4601931

Reputation: 5294

To be more specific, a reduce expects (or rather demands) a binary function that is associative and commutative, i.e., a function f(x, y) = x op y so that

x op (y op z) = (x op y) op z

and

x op y = y op x

Without these requirements being satisfied, you have no way to guarantee that the end result will be the same when combining intermediate results from different partitions. The associativity ensures that the computation can be parallelized at all; the commutativity ensures that the parallel computations can be reduced to the same final result (since you don't know the order in which the intermediate results will be eventually combined).

You can easily see that your function does not satisfy either condition, and hence cannot be used reliably with reduce:

x op (y op z) = x op (y + 1) = x + 1

whereas

(x op y) op z = (x + 1) op z = x + 2

which are not equal for any value of x. Similarly,

x op y = x + 1

which is equal to y op x if and only if x = y.

Upvotes: 0

sau
sau

Reputation: 1356

Yes You are absolutely right. You can make sure your rdd lies only on 1 partiton by using

rdd1 = rdd1.coalesce(1)
rdd2 = rdd1.reduce(lambda x,y: x+1)

Now you will get the expected answer.

Reason is when your rdd has more than one partiton and your are trying to use a reduce which is not using y at all. So Lets say your rdd has two partition so your reduce get something like this (reduce on partition 1, reduce on partition 2) and finally it gives you reduce result on partion 1 + 1

Upvotes: 2

Related Questions