Jack
Jack

Reputation: 5890

Got OutOfMemory when run Spark MLlib kmeans

I always got OutOfMemory error when I ran Spark Kmeans on big data set. The training set about 250GB, I have 10 nodes spark cluster each machine with 16 cpus and 150G memory. I give the job 100GB memory on each node and 50 cpus totally. I set the cluster center is 100 and iteration is 5. But I got OutOfMemory when the code was running on the following line:

val model = KMeans.train(parsedData, numClusters, numIterations)

Is there any parameter I can tune to fix the problem please.

If I set smaller cluster center number or iteration number it would be ok.

My Code as following:

val originalData = sc.textFile("hdfs://host/input.txt").cache()
val tupleData = originalData.map { x => (x.split(":")(0),x.split(":")(1)) }
val parsedData = tupleData.map { x => x._1 }.map(s => Vectors.dense(s.split(',').map(_.toDouble)))

val model = KMeans.train(parsedData, numClusters, numIterations, 1, initializationMode = KMeans.RANDOM)
val resultRdd = tupleData.map { p => (model.predict(Vectors.dense(p._1.split(',').map(_.toDouble))),p._2)}
resultRdd.sortByKey(true, 1).saveAsTextFile("hdfs://host/output.txt")

My input format as following:

0.0,0.0,91.8,21.67,0.0 ... (the element number is 100K)
1.1,1.08,19.8,0.0,0.0 ... 
0.0,0.08,19.8,0.0,0.0 ...
...
The rows number is 600K.

The Exception I got as following:

scheduler.DAGScheduler: Submitting ShuffleMapStage 42 (MapPartitionsRDD[49] at map at KmeansTest.scala:47), which has no missing parents
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)

Upvotes: 0

Views: 782

Answers (1)

Kien Truong
Kien Truong

Reputation: 11381

By default Spark's Kmeans implementation use K_MEANS_PARALLEL initialization mode. Part of this mode run on the driver machine and can be extremely slow / cause OOM on the driver, depending on your data.

Try switching to RANDOM initialization mode.

val model = KMeans.train(parsedData, numClusters, numIterations, 1, initializationMode = KMeans.RANDOM)

Another thing to try is to increase your driver memory when you submit your application. For example, use the following command to set driver memory to 4G

spark-submit --conf "spark.driver.memory=4g" ...

Upvotes: 6

Related Questions