naifmeh
naifmeh

Reputation: 408

Problem running a Pandas UDF on a large dataset

I'm currently working on a project and I am having a hard time understanding how does the Pandas UDF in PySpark works.

I have a Spark Cluster with one Master node with 8 cores and 64GB, along with two workers of 16 cores each and 112GB. My dataset is quite large and divided into seven principal partitions consisting each of ~78M lines. The dataset consists of 70 columns. I defined a Pandas UDF in to do some operations on the dataset, that can only be done using Python, on a Pandas dataframe.

The pandas UDF is defined this way :

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)

There is absolutely no way to get the Pandas UDF to work as it crashes before even doing the operations. I suspect there is an OOM error somewhere. The code above runs for a few minutes before crashing with an error code stating that the connection has reset. However, if I call the .toPandas() function after filtering on one partition and then display it, it runs fine, with no error. The error seems to happen only when using a PandasUDF.

I fail to understand how it works. Does Spark try to convert one whole partition at once (78M lines) ? If so, what memory does it use ? The driver memory ? The executor's ? If it's on the driver's, is all Python code executed on it ?

The cluster is configured with the following :

Am I missing something or is there just no way to run 78M lines through a PandasUDF ?

Upvotes: 7

Views: 6165

Answers (2)

bsauce
bsauce

Reputation: 672

To answer the general question about using a Pandas UDF on a large pyspark dataframe:

If you're getting out-of-memory errors such as java.lang.OutOfMemoryError : GC overhead limit exceeded or java.lang.OutOfMemoryError: Java heap space and increasing memory limits hasn't worked, ensure that pyarrow is enabled. It is disabled by default.

In pyspark, you can enable it using:

spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

More info here.

Upvotes: -1

user10938362
user10938362

Reputation: 4151

Does Spark try to convert one whole partition at once (78M lines) ?

That's exactly what happens. Spark 3.0 adds support for chunked UDFs, which operate on iterators of Pandas DataFrames or Series, but if operations on the dataset, that can only be done using Python, on a Pandas dataframe, these might not be the right choice for you.

If so, what memory does it use ? The driver memory? The executor's?

Each partition is processed locally, on the respective executor, and data is passed to and from Python worker, using Arrow streaming.

Am I missing something or is there just no way to run 78M lines through a PandasUDF?

As long as you have enough memory to handle Arrow input, output (especially if data is copied), auxiliary data structures, as well as as JVM overhead, it should handle large datasets just fine.

But on such tiny cluster, you'll be better with partitioning the output and reading data directly with Pandas, without using Spark at all. This way you'll be able to use all the available resources (i.e. > 100GB / interpreter) for data processing instead of wasting these on secondary tasks (having 16GB - overhead / interpreter).

Upvotes: 5

Related Questions