Dhiogo Corrêa
Dhiogo Corrêa

Reputation: 461

Spark - Reduce operation taking too long

I'm making an application with Spark that will run some topic extration algorithms. For that, first I need to make some preprocessing, extracting the document-term matrix by the end. Ive could done that, but for a (not that much) big collection of documents (only 2 thousand, 5MB), this proccess is taking forever.

So, debugging, Ive found where the program kinda stucks, and it's in a reduce operation. What I'm doing in this part of the code is counting how many times each term occurs on the collection, so first I done a "map", couting it for each rdd, and them I "reduce" it, saving the result inside a hashmap. The map operation is very fast, but in the reduce, its splitting the operation in 40 blocks, and each block takes 5~10 minutes to proccess.

So I'm trying to figure out what I'm doing wrong, or if reduce operations are that much costly.

SparkConf: Standalone mode, using local[2]. I've tried to use it as "spark://master:7077", and it worked, but still the same slowness.

Code:

"filesIn" is a JavaPairRDD where the key is the file path and the value is the content of the file. So, first the map, where I take this "filesIn", split the words, and count their frequency (in that case doesn't matter what document is) And then the reduce, where I create a HashMap (term, freq).

JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {

        @Override
        public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
            String[] allWords = t._2.split(" ");

            HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
            ArrayList<String> words = new ArrayList<String>();
            ArrayList<String> terms = new ArrayList<String>();
            HashMap<String, Integer> termDF = new HashMap<String, Integer>();

            for (String term : allWords) {

                if (hashTermFreq.containsKey(term)) {
                    Double freq = hashTermFreq.get(term);
                    hashTermFreq.put(term, freq + 1);
                } else {
                    if (term.length() > 1) {
                        hashTermFreq.put(term, 1.0);
                        if (!terms.contains(term)) {
                            terms.add(term);
                        }
                        if (!words.contains(term)) {
                            words.add(term);
                            if (termDF.containsKey(term)) {
                                int value = termDF.get(term);
                                value++;
                                termDF.put(term, value);
                            } else {
                                termDF.put(term, 1);
                            }
                        }
                    }
                }
            }
            return termDF;
        }
    });

 HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {

        @Override
        public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
            HashMap<String, Integer> result = new HashMap<String, Integer>();

            Iterator iterator = t1.keySet().iterator();

            while (iterator.hasNext()) {
                String key = (String) iterator.next();
                if (result.containsKey(key) == false) {
                    result.put(key, t1.get(key));
                } else {
                    result.put(key, result.get(key) + 1);
                }

            }

            iterator = t2.keySet().iterator();

            while (iterator.hasNext()) {
                String key = (String) iterator.next();
                if (result.containsKey(key) == false) {
                    result.put(key, t2.get(key));
                } else {
                    result.put(key, result.get(key) + 1);
                }

            }

            return result;
        }
    });

Thanks!

Upvotes: 2

Views: 2607

Answers (1)

zero323
zero323

Reputation: 330353

OK, so just off the top of my head:

  • Spark transformations are lazy. It means that map is not executed until you call subsequent reduce action so what you describe as slow reduce is most likely slow map + reduce
  • ArrayList.contains is O(N) so all these words.contains and terms.contains are extremely inefficient
  • map logic smells fishy. In particular:
    • if term has been already seen you never get into else branch
    • at first glance words and terms should have exactly the same content and should be equivalent to the hashTermFreq keys or termDF keys.
    • it looks like values in termDF can only take value 1. If this is what you want and you ignore frequencies what is the point of creating hashTermFreq?
  • reduce phase as implemented here means an inefficient linear scan with growing object over the data while you what you really want is reduceByKey.

Using Scala as a pseudocode your whole code can be efficiently expressed as follows:

val termDF = filesIn.flatMap{
  case (_, text) => 
    text.split(" ") // Split
    .toSet // Take unique terms 
    .filter(_.size > 1) // Remove single characters
    .map(term => (term, 1))} // map to pairs
  .reduceByKey(_ + _) // Reduce by key

termDF.collectAsMap // Optionally

Finally it looks like you're reinventing the wheel. At least some tools you need are already implemented in mllib.feature or ml.feature

Upvotes: 2

Related Questions