Reputation: 363
I'm trying to run sample pyspark PCA code from https://spark.apache.org/docs/2.2.0/ml-features.html#pca
I loaded DataFrame with 5,000,000 records, 23,000 features. After run PCA code I've got below errors
Py4JJavaError: An error occurred while calling o908.fit.
: java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1137)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeGramianMatrix(RowMatrix.scala:122)
at org.apache.spark.mllib.linalg.distributed.RowMatrix.computeCovariance(RowMatrix.scala:344)
at org.apache.spark.mllib.linalg.distributed.RowMatrix.computePrincipalComponentsAndExplainedVariance(RowMatrix.scala:387)
at org.apache.spark.mllib.feature.PCA.fit(PCA.scala:48)
at org.apache.spark.ml.feature.PCA.fit(PCA.scala:99)
at org.apache.spark.ml.feature.PCA.fit(PCA.scala:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
spark version is 2.2 I run spark with Yarn and spark parameters are:
spark.executor.memory=32G
spark.driver.memory=32G
spark.driver.maxResultSize=32G
Should I remove features to run PCA? Or other solutions?
Upvotes: 1
Views: 967
Reputation: 422
I'm suspecting that you can run this with a different configuration. How many executors do you have? If you have 100 executors and each is allocated 32GB on a system that has a total of 1TB memory, you'll run out quickly as each executor attempts to grab part of a total of 3.2TB memory (which doesn't exist). If on the other hand you have 1 executor running, 32GB is probably not enough to run the task. You may find that running 20 executors with 8GB memory each will give you enough to run the job (albeit maybe slowly).
When I'm having issues with a dataframe on an ML process I generally follow these steps for troubleshooting: 1) Test the method on a trivially-small dataframe: 10 features and 1,000 rows. To help avoid lineage issues, I suggest you reduce the sample frame at the source, either in your SQL with a "limit" statement or by passing a smaller CSV. If the method doesn't work with your code, the memory issue may be secondary. 2) If the method does not work on a trivially-small dataframe start investigating your data themselves. Are your features all numeric? Do any of your features have null values? Having non-numeric or null values in your features may cause the PCA routine to break (but not necessarily with an OutOfMemory error) 3) If the data are well-formed and your code is well-formed, start scaling up and make sure to look into your stderr and stdout in your nodes as your proceed. To get to your nodes, you should have a utility (for example, the Cloudera distro of hadoop includes ClouderaManager which allows you to look at your Jobs, then Stages, then individual tasks to find the stderr).
Upvotes: 1