Reputation: 6338
I am trying to write a short script to cluster my data via clojure (calling Mahout classes though). I have my input data in this format (which is an output from a php script)
format: (tag) (image) (frequency)
tag_sit image_a 0
tag_sit image_b 1
tag_lorem image_a 1
tag_lorem image_b 0
tag_dolor image_a 0
tag_dolor image_b 1
tag_ipsum image_a 1
tag_ipsum image_b 1
tag_amit image_a 1
tag_amit image_b 0
... (more)
Then I write them into a SequenceFile using this script (clojure)
(ns sensei.sequence.core)
(require 'clojure.string)
(require '
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.FileSystem)
(import org.apache.hadoop.fs.Path)
(import org.apache.mahout.math.VectorWritable)
(import org.apache.mahout.math.SequentialAccessSparseVector)
(with-open [reader ( *in*)]
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "" "hdfs://localhost:9000/")
hadoop_fs (FileSystem/get hadoop_configuration)]
(fn [writer [index value]]
(. writer append index value)
(new Path "test/sensei")
(fn [[tag row_vector]]
(let [input_index (new Text tag)
input_vector (new VectorWritable)]
(. input_vector set row_vector)
[input_index input_vector]))
(fn [[tag photo_list]]
(let [photo_map (apply hash-map photo_list)
input_vector (new SequentialAccessSparseVector (count (vals photo_map)))]
(loop [frequency_list (vals photo_map)]
(if (zero? (count frequency_list))
[tag input_vector]
(when-not (zero? (count frequency_list))
(. input_vector set
(mod (count frequency_list) (count (vals photo_map)))
(Integer/parseInt (first frequency_list)))
(recur (rest frequency_list)))))))
(fn [result next_line]
(let [[tag photo frequency] (clojure.string/split next_line #" ")]
(update-in result [tag]
#(if (nil? %)
[photo frequency]
(conj % photo frequency)))))
(line-seq reader)))))))
Basically it turns the input into sequence file, in this format
key (Text): $tag_uri
value (VectorWritable): a vector (cardinality = number of documents) with numeric index and the respective frequency <0:1 1:0 2:0 3:1 4:0 ...>
Then I proceed to do the actual cluster with this script (by referring to this blog post)
(ns sensei.clustering.fkmeans)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)
(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "" "hdfs://")
input_path (new Path "test/sensei")
output_path (new Path "test/clusters")
clusters_in_path (new Path "test/clusters/cluster-0")]
(int 2)
(new EuclideanDistanceMeasure))
(new EuclideanDistanceMeasure)
(double 0.5)
(int 10)
(float 5.0)
(double 0.0)
false)) '' runSequential
However I am getting output like this
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor
11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1
11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001
11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100
11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720
11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680
11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001
java.lang.IllegalStateException: No clusters found. Check your -c path.
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(
at org.apache.hadoop.mapred.MapTask.runNewMapper(
at org.apache.hadoop.mapred.LocalJobRunner$
11/08/25 15:20:18 INFO mapred.JobClient: map 0% reduce 0%
11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001
11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0
Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed
at clojure.lang.Util.runtimeException(
at clojure.lang.Compiler.eval(
at clojure.lang.Compiler.load(
at clojure.lang.Compiler.loadFile(
at clojure.main$load_script.invoke(main.clj:282)
at clojure.main$script_opt.invoke(main.clj:342)
at clojure.main$main.doInvoke(main.clj:426)
at clojure.lang.RestFn.invoke(
at clojure.lang.Var.invoke(
at clojure.lang.AFn.applyToHelper(
at clojure.lang.Var.applyTo(
at clojure.main.main(
Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(
at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
at clojure.lang.Compiler.eval(
... 10 more
When runSequential is set to true
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See for further details.
11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor
Exception in thread "main" java.lang.IllegalStateException: Clusters is empty!
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(
at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(
at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)
at clojure.lang.Compiler.eval(
at clojure.lang.Compiler.load(
at clojure.lang.Compiler.loadFile(
at clojure.main$load_script.invoke(main.clj:282)
at clojure.main$script_opt.invoke(main.clj:342)
at clojure.main$main.doInvoke(main.clj:426)
at clojure.lang.RestFn.invoke(
at clojure.lang.Var.invoke(
at clojure.lang.AFn.applyToHelper(
at clojure.lang.Var.applyTo(
at clojure.main.main(
I have also rewritten the fkmeans script to this form
(ns sensei.clustering.fkmeans)
(import org.apache.hadoop.conf.Configuration)
(import org.apache.hadoop.fs.Path)
(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver)
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure)
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)
(let [hadoop_configuration ((fn []
(let [conf (new Configuration)]
(. conf set "" "hdfs://localhost:9000/")
driver (new FuzzyKMeansDriver)]
(. driver setConf hadoop_configuration)
(. driver
(into-array String ["--input" "test/sensei"
"--output" "test/clusters"
"--clusters" "test/clusters/clusters-0"
"--emitMostLikely" "false"
"--numClusters" "3"
"--maxIter" "10"
"--m" "5"])))
but is still getting same error as the first initial version :/
Command Line tool runs fine
$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5
However it would not return the points when I try clusterdumper even though --clustering option exists in the previous command and --pointsDir is defined here
$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt
Mahout version used: 0.6-snapshot, clojure 1.3.0-snapshot
Please let me know if I miss out anything
Upvotes: 27
Views: 2388
Reputation: 77454
My guess is that the Mahout implementation of fuzzy-c-means needs initial clusters to start with, which you maybe did not supply?
Also it sounds a bit as if you are running single-node? Note that for single-node systems you should avoid all the Mahout/Hadoop overhead and just use a regular clustering algorithm. Hadoop/Mahout comes at quite a cost that only pays off when you can no longer process the data on a single system. It is not "map reduce" unless you do that on a large number of systems.
Upvotes: 2