Emmy
Emmy

Reputation: 60

Running into memory errors when deduping pyspark dataframe

I am somewhat new to pyspark and running into a problem with deduplicating a dataframe.

My dataframe has 3 fields in it, PersonId, PlaceId, and ThingId. Here is a sample:

PersonTest = [1,1,2,2,2,3,4]
PlaceTest = [['A', 'B'],['A', 'B', 'C'],['C'],['C','D','E','F'],['C','D','F'],['C','D','F'],['D','F']]
ThingTest = [9,8,7,6,5,4,3] 

pandasdf = pd.DataFrame({'PersonId' : PersonTest, 'PlaceId' : PlaceTest, 'ThingId' : ThingTest})

What I want to end up with is one line per PersonID, with the PlaceId to be the set of the places and ThingId to be the max of the ThingIds for that PersonId. So in the example, I should end up with a dataframe that looks like this:

PersonId    PlaceId   ThingId
1   [A, B, C]      9
2   [C, D, E, F]   7
3   [C, D, F]      4
4   [D, F]         3

Use this to create a spark dataframe.

sparkdf = spark.createDataFrame(pandasdf, ['PersonId', 'PlaceId', 'ThingId'])

This should reduce the number of rows from about 300,000 to 75,000. From here I tried a couple of things. I tried to create a different dataframe with just dropping the duplicates, like this.

dropped_df = df_prop_spark.dropDuplicates(subset=['PersonId']).count()

I also tried to drop and collect. (I just realized I should have used collect_set here.) Either way, I get an out of memory error.

df_prop_spark.dropDuplicates(subset=['person_id']).agg(collect_list('prop_id')).show()

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-6-db7f6a9bc71f> in <module>
----> 1 df_prop_spark.dropDuplicates(subset=['PersonId']).agg(collect_list('PlaceId')).show()

~/miniconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

~/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/miniconda3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
    129     def deco(*a, **kw):
    130         try:
--> 131             return f(*a, **kw)
    132         except py4j.protocol.Py4JJavaError as e:
    133             converted = convert_exception(e.java_exception)

~/miniconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o51.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, 10.100.0.161, executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
    at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space

Does anyone have any ideas?

Upvotes: 1

Views: 936

Answers (1)

jayrythium
jayrythium

Reputation: 767

What environment are you using? Also, I would steer clear from pandas as it stores everything in memory, and create a spark Dataframe instead. You could also just beef up your driver memory. This block assumes that you’re running a spark app on a local machine and sets the driver memory to 10g (you can go as much as your system allows).

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "10g") \
    .appName('your app name') \
    .getOrCreate()

Upvotes: 1

Related Questions