Reputation: 3071
For testing purposes, while I don´t have production cluster, I am using spark locally:
print('Setting SparkContext...')
sconf = SparkConf()
sconf.setAppName('myLocalApp')
sconf.setMaster('local[*]')
sc = SparkContext(conf=sconf)
print('Setting SparkContext...OK!')
Also, I am using a very very small dataset, consisting of only 20 rows in a postgresql database ( ~2kb)
Also(!), my code is quite simple as well, only grouping 20 rows by a key and applying a trivial map operation
params = [object1, object2]
rdd = df.rdd.keyBy(lambda x: (x.a, x.b, x.c)) \
.groupByKey() \
.mapValues(lambda value: self.__data_interpolation(value, params))
def __data_interpolation(self, data, params):
# TODO: only for testing
return data
What bothers me is that the whole execution takes about 5 minutes!!
Inspecting the Spark UI, I see that most of the time was spent in Stage 6: byKey method. (Stage 7, collect() method was also slow...)
Some info:
These numbers make no sense to me... Why do I need 22 tasks, executing for 54 sec, to process less than 1 kb of data
Can it be a network issue, trying to figure out the ip address of localhost? I don't know... Any clues?
Upvotes: 9
Views: 3221
Reputation: 3254
It appears the main reason for the slower performance in your code snippet is due to the use of groupByKey()
. The issue with groupByKey
is that it ends up shuffling all of the key-value pairs resulting in a lot of data unnecessarily being transferred. A good reference to explain this issue is Avoid GroupByKey.
To work around this issue, you can:
reduceByKey
which should be faster (more info is also included in the above Avoid GroupByKey link).By the way, reviewing the Spark UI diagram above, the #22 refers to the task # within the DAG (not the number of tasks executed).
HTH!
Upvotes: 3
Reputation: 13946
I suppose the "postgresql" is the key to solve that puzzle.
keyBy
is probably the first operation that really uses the data so it's execution time is bigger as it needs to get the data from external database. You can verify it by adding at the beginning:
df.cache()
df.count() # to fill the cache
df.rdd.keyBy....
If I am right, you need to optimize the database. It may be:
Upvotes: 1
Reputation: 85
From what I have seen happening in my system while running spark:
When we run a spark job it internally creates map and reduce tasks and runs them. In your case, to run the data you have, it created 22 such tasks. I bigger the size of data the number may be big.
Hope this helps.
Upvotes: 0