user2848932
user2848932

Reputation: 786

run reduceByKey on huge data in spark

I'm running reduceByKey in spark. My program is the simplest example of spark:

val counts = textFile.flatMap(line => line.split(" ")).repartition(20000).
                 .map(word => (word, 1))
                 .reduceByKey(_ + _, 10000)
counts.saveAsTextFile("hdfs://...")

but it always run out of memory...

I 'm using 50 servers , 35 executors per server, 140GB memory per server.

the documents volume is : 8TB documents, 20 billion documents, 1000 billion words in total. and the words after reduce will be about 100 million.

I wonder how to set the configuration of spark?

I wonder what value should these parameters be?

1. the number of the maps ? 20000 for example?
2. the number of the reduces ? 10000 for example?
3. others parameters?

Upvotes: 8

Views: 3790

Answers (2)

KyBe
KyBe

Reputation: 842

Did you try to use a partionner, it can help to reduce the number of key per node, if we suppose that keys word weights in average 1ko, it implies 100Go of memory exclusively for keys per node. With partitioning you can approximatively reduce number of key per node by the number of node, reducing accordingly the necessary amount of memery per node. The spark.storage.memoryFraction option mentioned by @Holden is also an key factor.

Upvotes: 0

Holden
Holden

Reputation: 7442

It would be helpful if you posted the logs, but one option would be to specify a larger number of partitions when reading in the initial text file (e.g. sc.textFile(path, 200000)) rather than re-partitioning after reading . Another important thing is to make sure that your input file is splittable (some compression options make it not splittable, and in that case Spark may have to read it on a single machine causing OOMs).

Some other options are, since you aren't caching any of the data, would be reducing the amount of memory Spark is setting aside for caching (controlled with with spark.storage.memoryFraction), also since you are only working with tuples of strings I'd recommend using the org.apache.spark.serializer. KryoSerializer serializer.

Upvotes: 5

Related Questions