Reputation: 461
I'm making an application with Spark that will run some topic extration algorithms. For that, first I need to make some preprocessing, extracting the document-term matrix by the end. Ive could done that, but for a (not that much) big collection of documents (only 2 thousand, 5MB), this proccess is taking forever.
So, debugging, Ive found where the program kinda stucks, and it's in a reduce operation. What I'm doing in this part of the code is counting how many times each term occurs on the collection, so first I done a "map", couting it for each rdd, and them I "reduce" it, saving the result inside a hashmap. The map operation is very fast, but in the reduce, its splitting the operation in 40 blocks, and each block takes 5~10 minutes to proccess.
So I'm trying to figure out what I'm doing wrong, or if reduce operations are that much costly.
SparkConf: Standalone mode, using local[2]. I've tried to use it as "spark://master:7077", and it worked, but still the same slowness.
Code:
"filesIn" is a JavaPairRDD where the key is the file path and the value is the content of the file. So, first the map, where I take this "filesIn", split the words, and count their frequency (in that case doesn't matter what document is) And then the reduce, where I create a HashMap (term, freq).
JavaRDD<HashMap<String, Integer>> termDF_ = filesIn.map(new Function<Tuple2<String, String>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(Tuple2<String, String> t) throws Exception {
String[] allWords = t._2.split(" ");
HashMap<String, Double> hashTermFreq = new HashMap<String, Double>();
ArrayList<String> words = new ArrayList<String>();
ArrayList<String> terms = new ArrayList<String>();
HashMap<String, Integer> termDF = new HashMap<String, Integer>();
for (String term : allWords) {
if (hashTermFreq.containsKey(term)) {
Double freq = hashTermFreq.get(term);
hashTermFreq.put(term, freq + 1);
} else {
if (term.length() > 1) {
hashTermFreq.put(term, 1.0);
if (!terms.contains(term)) {
terms.add(term);
}
if (!words.contains(term)) {
words.add(term);
if (termDF.containsKey(term)) {
int value = termDF.get(term);
value++;
termDF.put(term, value);
} else {
termDF.put(term, 1);
}
}
}
}
}
return termDF;
}
});
HashMap<String, Integer> termDF = termDF_.reduce(new Function2<HashMap<String, Integer>, HashMap<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> call(HashMap<String, Integer> t1, HashMap<String, Integer> t2) throws Exception {
HashMap<String, Integer> result = new HashMap<String, Integer>();
Iterator iterator = t1.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t1.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
iterator = t2.keySet().iterator();
while (iterator.hasNext()) {
String key = (String) iterator.next();
if (result.containsKey(key) == false) {
result.put(key, t2.get(key));
} else {
result.put(key, result.get(key) + 1);
}
}
return result;
}
});
Thanks!
Upvotes: 2
Views: 2607
Reputation: 330353
OK, so just off the top of my head:
map
is not executed until you call subsequent reduce
action so what you describe as slow reduce
is most likely slow map
+ reduce
ArrayList.contains
is O(N) so all these words.contains
and terms.contains
are extremely inefficientmap
logic smells fishy. In particular:
else
branchwords
and terms
should have exactly the same content and should be equivalent to the hashTermFreq
keys or termDF
keys.termDF
can only take value 1. If this is what you want and you ignore frequencies what is the point of creating hashTermFreq
?reduce
phase as implemented here means an inefficient linear scan with growing object over the data while you what you really want is reduceByKey
. Using Scala as a pseudocode your whole code can be efficiently expressed as follows:
val termDF = filesIn.flatMap{
case (_, text) =>
text.split(" ") // Split
.toSet // Take unique terms
.filter(_.size > 1) // Remove single characters
.map(term => (term, 1))} // map to pairs
.reduceByKey(_ + _) // Reduce by key
termDF.collectAsMap // Optionally
Finally it looks like you're reinventing the wheel. At least some tools you need are already implemented in mllib.feature
or ml.feature
Upvotes: 2