Reputation: 60
I am somewhat new to pyspark and running into a problem with deduplicating a dataframe.
My dataframe has 3 fields in it, PersonId, PlaceId, and ThingId. Here is a sample:
PersonTest = [1,1,2,2,2,3,4]
PlaceTest = [['A', 'B'],['A', 'B', 'C'],['C'],['C','D','E','F'],['C','D','F'],['C','D','F'],['D','F']]
ThingTest = [9,8,7,6,5,4,3]
pandasdf = pd.DataFrame({'PersonId' : PersonTest, 'PlaceId' : PlaceTest, 'ThingId' : ThingTest})
What I want to end up with is one line per PersonID, with the PlaceId to be the set of the places and ThingId to be the max of the ThingIds for that PersonId. So in the example, I should end up with a dataframe that looks like this:
PersonId PlaceId ThingId
1 [A, B, C] 9
2 [C, D, E, F] 7
3 [C, D, F] 4
4 [D, F] 3
Use this to create a spark dataframe.
sparkdf = spark.createDataFrame(pandasdf, ['PersonId', 'PlaceId', 'ThingId'])
This should reduce the number of rows from about 300,000 to 75,000. From here I tried a couple of things. I tried to create a different dataframe with just dropping the duplicates, like this.
dropped_df = df_prop_spark.dropDuplicates(subset=['PersonId']).count()
I also tried to drop and collect. (I just realized I should have used collect_set here.) Either way, I get an out of memory error.
df_prop_spark.dropDuplicates(subset=['person_id']).agg(collect_list('prop_id')).show()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-db7f6a9bc71f> in <module>
----> 1 df_prop_spark.dropDuplicates(subset=['PersonId']).agg(collect_list('PlaceId')).show()
~/miniconda3/lib/python3.7/site-packages/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
438 """
439 if isinstance(truncate, bool) and truncate:
--> 440 print(self._jdf.showString(n, 20, vertical))
441 else:
442 print(self._jdf.showString(n, int(truncate), vertical))
~/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
~/miniconda3/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
~/miniconda3/lib/python3.7/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o51.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3, 10.100.0.161, executor driver): java.lang.OutOfMemoryError: Java heap space
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
Does anyone have any ideas?
Upvotes: 1
Views: 936
Reputation: 767
What environment are you using? Also, I would steer clear from pandas as it stores everything in memory, and create a spark Dataframe instead. You could also just beef up your driver memory. This block assumes that you’re running a spark app on a local machine and sets the driver memory to 10g (you can go as much as your system allows).
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.master('local[*]') \
.config("spark.driver.memory", "10g") \
.appName('your app name') \
.getOrCreate()
Upvotes: 1