dfrankow
dfrankow

Reputation: 21459

How to run large Mahout fuzzy kmeans clustering without running out of memory?

I am running Mahout 0.7 fuzzy k-means clustering on Amazon's EMR (AMI 2.3.1) and I am running out of memory.

Here is an invocation:

./bin/mahout fkmeans \
  --input s3://.../foo/vectors.seq \
  --output s3://.../foo/fuzzyk2 \
  --numClusters 128 \
  --clusters s3://.../foo/initial_clusters/ \
  --maxIter 20 \
  --m 2 \
  --method mapreduce \
  --distanceMeasure org.apache.mahout.common.distance.TanimotoDistanceMeasure

More detailed questions:

vectors.seq is a collection of sparse vectors with 50225 vectors (50225 things related to 124420 others), a total of 1.2M relationships.

This post says set --method mapreduce, which I am, and which is the default.

This post says all clusters are held in memory on every mapper and reducer. That would be 4*124420=498K things, which doesn't seem too bad.

Here is the stack:

13/04/19 18:12:53 INFO mapred.JobClient: Job complete: job_201304161435_7034
13/04/19 18:12:53 INFO mapred.JobClient: Counters: 7
13/04/19 18:12:53 INFO mapred.JobClient:   Job Counters 
13/04/19 18:12:53 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=28482
13/04/19 18:12:53 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/04/19 18:12:53 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/04/19 18:12:53 INFO mapred.JobClient:     Rack-local map tasks=4
13/04/19 18:12:53 INFO mapred.JobClient:     Launched map tasks=4
13/04/19 18:12:53 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=0
13/04/19 18:12:53 INFO mapred.JobClient:     Failed map tasks=1
Exception in thread "main" java.lang.InterruptedException: Cluster Iteration 1 failed processing s3://.../foo/fuzzyk2/clusters-1
        at org.apache.mahout.clustering.iterator.ClusterIterator.iterateMR(ClusterIterator.java:186)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:288)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:221)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:110)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
        at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.main(FuzzyKMeansDriver.java:52)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:68)
        at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:139)
        at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:195)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:187)

And here's part of the log of the mapper:

2013-04-19 18:10:38,734 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Received IOException while reading '.../foo/vectors.seq', attempting to reopen.
java.net.SocketTimeoutException: Read timed out
        at java.net.SocketInputStream.socketRead0(Native Method)
        at java.net.SocketInputStream.read(SocketInputStream.java:129)
        at com.sun.net.ssl.internal.ssl.InputRecord.readFully(InputRecord.java:293)
        at com.sun.net.ssl.internal.ssl.InputRecord.readV3Record(InputRecord.java:405)
        at com.sun.net.ssl.internal.ssl.InputRecord.read(InputRecord.java:360)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:798)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:755)
        at com.sun.net.ssl.internal.ssl.AppInputStream.read(AppInputStream.java:75)
        at org.apache.http.impl.io.AbstractSessionInputBuffer.read(AbstractSessionInputBuffer.java:187)
        at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:164)
        at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:138)
        at java.io.FilterInputStream.read(FilterInputStream.java:116)
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:291)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:218)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:258)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:317)
        at java.io.DataInputStream.readFully(DataInputStream.java:178)
        at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
        at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2060)
        at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2194)
        at org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader.nextKeyValue(SequenceFileRecordReader.java:68)
        at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.nextKeyValue(MapTask.java:540)
        at org.apache.hadoop.mapreduce.MapContext.nextKeyValue(MapContext.java:67)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:143)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)
2013-04-19 18:10:38,737 INFO org.apache.hadoop.fs.s3native.NativeS3FileSystem (main): Stream for key '.../foo/vectors.seq' seeking to position '62584'
2013-04-19 18:10:42,619 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2013-04-19 18:10:42,730 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2013-04-19 18:10:42,730 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2013-04-19 18:10:42,733 FATAL org.apache.hadoop.mapred.Child (main): Error running child : java.lang.OutOfMemoryError: Java heap space
        at org.apache.mahout.math.map.OpenIntDoubleHashMap.rehash(OpenIntDoubleHashMap.java:434)
        at org.apache.mahout.math.map.OpenIntDoubleHashMap.put(OpenIntDoubleHashMap.java:387)
        at org.apache.mahout.math.RandomAccessSparseVector.setQuick(RandomAccessSparseVector.java:139)
        at org.apache.mahout.math.AbstractVector.assign(AbstractVector.java:560)
        at org.apache.mahout.clustering.AbstractCluster.observe(AbstractCluster.java:253)
        at org.apache.mahout.clustering.AbstractCluster.observe(AbstractCluster.java:241)
        at org.apache.mahout.clustering.AbstractCluster.observe(AbstractCluster.java:37)
        at org.apache.mahout.clustering.classify.ClusterClassifier.train(ClusterClassifier.java:158)
        at org.apache.mahout.clustering.iterator.CIMapper.map(CIMapper.java:55)
        at org.apache.mahout.clustering.iterator.CIMapper.map(CIMapper.java:18)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:771)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:375)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:396)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1132)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

Upvotes: 1

Views: 1584

Answers (1)

Sean Owen
Sean Owen

Reputation: 66891

Yes you're running out of memory. As far as I know, that "memory intensive workload" bootstrap action is long since deprecated, so may do nothing. See the note on that page.

A c1.xlarge should use 384MB per mapper by default. When you subtract out all the JVM overhead, room for splits and combining, etc, you probably don't have a whole lot left.

You set Hadoop params in a bootstrap action. Choose the "Configure Hadoop" action instead if using the console and set something like --site-key-value mapred.map.child.java.opts=-Xmx1g

(If you're doing this programmatically, and having any trouble, contact me offline; I can provide snippets from Myrrix since it heavily tunes the EMR clusters for speed in its recommend/clustering jobs.)

You can set mapred.map.java.child.opts instead to control mappers separately from reducers. You can also turn down the number of mappers per machine to make more room, or, choose a high-memory instance. I usually find ml.xlarge is optimal for EMR given price-to-I/O ratio, and because most jobs end up being I/O-bound.

Upvotes: 1

Related Questions