Ryan Deschamps
Ryan Deschamps

Reputation: 385

Is there any way to increase the efficiency of PySpark outputs?

I am trying to test the ability of PySpark to iterate over some very large (10s of GBs to 1s of TBs) data. For most scripts I find PySpark to have about the same efficiency as Scala code. In other cases (like the code below) I get serious speed problems ranging from 10 to 12 times slower.

path = "path/to/file"
spark = SparkSession.builder.appName("siteLinkStructureByDate").getOrCreate()
sc = spark.sparkContext   

df = RecordLoader.loadSomethingAsDF(path, sc, spark)
fdf = df.select(df['aDate'], df['aSourceUrl'], df['contentTextWithUrls'])
rdd = fdf.rdd
rddx = rdd.map (lambda r: (r.aDate, CreateAVertexFromSourceUrlAndContent(r.aSourceUrl, r.contentTextWithUrls)))\
 .flatMap(lambda r: map(lambda f: (r[0], ExtractDomain(f[0]), ExtractDomain(f[1])), r[1]))\
 .filter(lambda r: r[-1] != None)\
 .countByValue()

print([((x[0], x[1], x[2]), y) for x, y in rddx.items()]) 

We think we have isolated the problem to the .countByValue() (which returns a defaultdict), but applying countItems() or reduceByKey() produces pretty much the same results. We are also 99% sure the problem is not with the ExtractDomain or CreateAVertexFromSourceUrlAndContent (not the real names of the functions, just pseudocode to make it understandable).

So my question is first

  1. is there anything in this code that I can do to reduce the time?
  2. Is PySpark fundamentally that much slower than its Scala counterpart?
  3. Is there a way to replicate the flatmap using PySpark dataframes instead (understanding that dataframes are generally faster than RDD in Pyspark)?

Upvotes: 3

Views: 1134

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35229

The biggest problem here might be communication - Spark SQL (columnar format) -> plain Scala object -> pickle (Pyrolite) -> socket -> unpickle -> plain Python object. That's a lot of copying, converting and moving things.

there a way to replicate the flatmap using PySpark dataframes instead

Yes. It is called explode - but to be fair it is slowish as well.

understanding that dataframes are generally faster than RDD in Pyspark

That's usually true (Scala and Python both), but you might need udf to implement ExtractDomain or CreateAVertexFromSourceUrlAndContent - it is another slow thing. Just from the names you might be able to use parse_url_tuple.

Is PySpark fundamentally that much slower than its Scala counterpart?

It is somewhat slower. Normally not that slower on well tuned code. But implementation details are different - the same set of operation in both Scala and Python can be materialized in a different way.

is there anything in this code that I can do to reduce the time?

I'd recommend profiling first. Once you determine which part is responsible (conversions, merging) you can try to target it.

Upvotes: 2

Related Questions