robarthur1
robarthur1

Reputation: 507

Pyspark Out of Memory window function

I'm seeing a few scalability problems with a pyspark script I've written and was wondering if anyone would be able to shed a bit of light.

I have a very similar use case to the one presented here:

Separate multi line record with start and end delimiter

In that I have some multi line data that where there is a logical delimiter between records. E.g. the data looks like:

AA123
BB123
CCXYZ
AA321
BB321
CCZYX
...

Using the example in the previous answer, I've separated this into multiple records using a script like...

spark = SparkSession \
    .builder \
    .appName("TimetableSession") \
    #Played around with setting the available memory at runtime
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()

files = os.path.join("data","*_lots_of_gzipped_files.gz")
df=spark.sparkContext.textFile(files).toDF()
df=df.withColumn("id", monotonically_increasing_id())

w=Window.partitionBy().orderBy('id')
df=df.withColumn('AA_indicator', expr("case when entry like 'AA%' then 1 else 0 end"))
#!!!Blowing up with OOM errors here at scale!!!
df=df.withColumn('index', sum('AA_indicator').over(w))
df.show()

+--------------------+---+------------+-----+
|               entry| id|AA_indicator|index|
+--------------------+---+------------+-----+
|               AA123|  1|           1|    1|
|               BB123|  2|           0|    1|
|               CCXYZ|  3|           0|    1|
|               AA321|  4|           1|    2|
|               BB321|  5|           0|    2|
|               CCZYX|  6|           0|    2|
+--------------------+---+------------+-----+

This seems to work ok with data which is a reasonable size (e.g. 50MB of data) when I scale this up to > 1GB of data I'm seeing Java OOM errors. I'm seeing the same problem even when attempting to allocate > 20GB memory to spark.driver/executor.

I believe the problem is that the window for the data partitioned and everything is being collected into memory at once rather than being parralelised? But I might be way off the mark with this.

I'm running this script in a standalone docker container using the jupyter pyspark notebook https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook.

Any help in terms of a better approach to indexing 'records' or how to better approach the problem would be much appreciated.

Upvotes: 2

Views: 2992

Answers (1)

Alper t. Turker
Alper t. Turker

Reputation: 35249

Probably because you use window without PARTITION BY:

Window.partitionBy().orderBy('id')

In that case Spark doesn't distribute the data and processes all records on a single machine sequentially.

Having a lot of gzipped files makes it even worse, as gzip compression cannot be split. So each file is loaded on a single machine, and can OOM as well.

Overall this is not something that benefits Spark.

Upvotes: 3

Related Questions