Reputation: 507
I'm seeing a few scalability problems with a pyspark script I've written and was wondering if anyone would be able to shed a bit of light.
I have a very similar use case to the one presented here:
Separate multi line record with start and end delimiter
In that I have some multi line data that where there is a logical delimiter between records. E.g. the data looks like:
AA123
BB123
CCXYZ
AA321
BB321
CCZYX
...
Using the example in the previous answer, I've separated this into multiple records using a script like...
spark = SparkSession \
.builder \
.appName("TimetableSession") \
#Played around with setting the available memory at runtime
.config("spark.executor.memory", "8g") \
.config("spark.driver.memory", "8g") \
.getOrCreate()
files = os.path.join("data","*_lots_of_gzipped_files.gz")
df=spark.sparkContext.textFile(files).toDF()
df=df.withColumn("id", monotonically_increasing_id())
w=Window.partitionBy().orderBy('id')
df=df.withColumn('AA_indicator', expr("case when entry like 'AA%' then 1 else 0 end"))
#!!!Blowing up with OOM errors here at scale!!!
df=df.withColumn('index', sum('AA_indicator').over(w))
df.show()
+--------------------+---+------------+-----+
| entry| id|AA_indicator|index|
+--------------------+---+------------+-----+
| AA123| 1| 1| 1|
| BB123| 2| 0| 1|
| CCXYZ| 3| 0| 1|
| AA321| 4| 1| 2|
| BB321| 5| 0| 2|
| CCZYX| 6| 0| 2|
+--------------------+---+------------+-----+
This seems to work ok with data which is a reasonable size (e.g. 50MB of data) when I scale this up to > 1GB of data I'm seeing Java OOM errors. I'm seeing the same problem even when attempting to allocate > 20GB memory to spark.driver/executor.
I believe the problem is that the window for the data partitioned and everything is being collected into memory at once rather than being parralelised? But I might be way off the mark with this.
I'm running this script in a standalone docker container using the jupyter pyspark notebook https://github.com/jupyter/docker-stacks/tree/master/pyspark-notebook.
Any help in terms of a better approach to indexing 'records' or how to better approach the problem would be much appreciated.
Upvotes: 2
Views: 2992
Reputation: 35249
Probably because you use window without PARTITION BY
:
Window.partitionBy().orderBy('id')
In that case Spark doesn't distribute the data and processes all records on a single machine sequentially.
Having a lot of gzipped
files makes it even worse, as gzip compression cannot be split. So each file is loaded on a single machine, and can OOM as well.
Overall this is not something that benefits Spark.
Replacing cumulative sum with window, with lower level code, like shown in How to compute cumulative sum using Spark
This also seems to be relevant: Avoid performance impact of a single partition mode in Spark window functions
Upvotes: 3