guilhermecgs
guilhermecgs

Reputation: 3071

Spark on localhost

For testing purposes, while I don´t have production cluster, I am using spark locally:

print('Setting SparkContext...')
sconf = SparkConf()
sconf.setAppName('myLocalApp')
sconf.setMaster('local[*]')
sc = SparkContext(conf=sconf)
print('Setting SparkContext...OK!')

Also, I am using a very very small dataset, consisting of only 20 rows in a postgresql database ( ~2kb)

Also(!), my code is quite simple as well, only grouping 20 rows by a key and applying a trivial map operation

params = [object1, object2]
rdd = df.rdd.keyBy(lambda x: (x.a, x.b, x.c)) \
                          .groupByKey() \
                          .mapValues(lambda value: self.__data_interpolation(value, params))


def __data_interpolation(self, data, params):
    # TODO: only for testing
    return data

What bothers me is that the whole execution takes about 5 minutes!!

Inspecting the Spark UI, I see that most of the time was spent in Stage 6: byKey method. (Stage 7, collect() method was also slow...)

Some info:

enter image description here

enter image description here

These numbers make no sense to me... Why do I need 22 tasks, executing for 54 sec, to process less than 1 kb of data

Can it be a network issue, trying to figure out the ip address of localhost? I don't know... Any clues?

Upvotes: 9

Views: 3221

Answers (3)

Denny Lee
Denny Lee

Reputation: 3254

It appears the main reason for the slower performance in your code snippet is due to the use of groupByKey(). The issue with groupByKey is that it ends up shuffling all of the key-value pairs resulting in a lot of data unnecessarily being transferred. A good reference to explain this issue is Avoid GroupByKey.

To work around this issue, you can:

  1. Try using reduceByKey which should be faster (more info is also included in the above Avoid GroupByKey link).
  2. Use DataFrames (instead of RDDs) as DFs include performance optimizations (and the DF GroupBy statement is faster than the RDD version). As well, as you're using Python, you can avoid the Python-to-JVM issues with PySpark RDDs. More information on this can be seen in PySpark Internals

By the way, reviewing the Spark UI diagram above, the #22 refers to the task # within the DAG (not the number of tasks executed).

HTH!

Upvotes: 3

Mariusz
Mariusz

Reputation: 13946

I suppose the "postgresql" is the key to solve that puzzle.

keyBy is probably the first operation that really uses the data so it's execution time is bigger as it needs to get the data from external database. You can verify it by adding at the beginning:

df.cache()
df.count() # to fill the cache
df.rdd.keyBy....

If I am right, you need to optimize the database. It may be:

  1. Network issue (slow network to DB server)
  2. Complicated (and slow) SQL on this database (try it using postgre shell)
  3. Some authorization difficulties on DB server
  4. Problem with JDBC driver you use

Upvotes: 1

user7005835
user7005835

Reputation: 85

From what I have seen happening in my system while running spark:

When we run a spark job it internally creates map and reduce tasks and runs them. In your case, to run the data you have, it created 22 such tasks. I bigger the size of data the number may be big.

Hope this helps.

Upvotes: 0

Related Questions