RamsesXVII
RamsesXVII

Reputation: 305

PySpark very slow on Amazon cluster

I'm working on a Spark project and trying to excecute the app on a cluster on amazon. Performance are very slow also on small file. I don't want a solution, just an opinion about the possible reasons for the experienced slowness.

spark = SparkSession.builder.appName("appName").getOrCreate()
sc = spark.sparkContext

rec= sc.textFile(sys.argv[1])
#  rec= sc.parallelize(records.collect())

a= rec.map(lambda line: line.split("\t"))
          .filter(lambda x: int(x[6])>=4)
          .map(lambda x: (x[1],[x[2], x[6]]))
a=a.join(a)
   .filter(lambda (x,(a,b)): a[0]<b[0])
   .map(lambda (x,(b,a)): ((a[0],b[0]),x))
   .groupByKey()
   .filter(lambda (x,y): len(set(y))>2)
   .sortBy(lambda a: a[0])
   .saveAsTextFile(sys.argv[2])

Upvotes: 0

Views: 1525

Answers (2)

Garren S
Garren S

Reputation: 5792

This code sequence alone may be causing you the most grief:

records = sc.textFile(sys.argv[1])
rec= sc.parallelize(records.collect())

Why? You're reading the file in as an RDD via the spark context textfile function then it will be materialized across the cluster, then and here's the kicker, by calling records.collect() you're telling the cluster to send ALL the data BACK to the driver (whatever machine launched the job), and finally you'll REBUILD the RDD from that locally collected list. Just use the records RDD in place of the rec RDD

Edit: It looks like you're doing a cross join of a to itself. Is that intended and for what purpose?

Group by key forces a shuffle of all the data in contrast to reduce by key.

Stop using the SparkContext; it's there for backwards compatibility. Use your sparksession's .read.option("delimiter", "\t").csv(file path) which will create a DataFrame instead of an RDD, parsing the tab delimited lines for you into generic Row objects and critically then by using the DataFrame APIs you can get far better performance (courtesy of Tungsten and Catalyst). This is especially useful since you're using PySpark because by using Spark 2.x with DataFrames and Python, the performance is in line with scala on the JVM whereas using RDDs with Python means the Python interpreters do the work (re Scala is much faster).

Upvotes: 1

Dat Tran
Dat Tran

Reputation: 2392

Well PySpark or Spark in general is made for use cases where you need to deal with a lot of data. It is expected that it is slow for small data. Here are the reasons:

  1. In PySpark depending on how many executors you have it needs to spin up JVM first. Moreover, in case of PySpark an additional overhead is created through the Python sub-processes which needs to be invoked as well. PySpark Internals
  2. The second reason is due to data shuffling. Your data might be shuffled around the network. In a local case the data is calculated on the same node. For distribution of data the scheduler needs to figure out first where it needs to put the data and then also what to do with it.

So PySpark/Spark only shines when you need to do something with "big data"! I've seen many people get very disappointed at first because they say that Spark is slow but they only used it for very small data amount. Hope this helps!

Upvotes: 0

Related Questions