Reputation: 683
I am writing a program on Spark, which I just do the aggregate by the key. The program is pretty simple. My input data is only 2GB, running on a multi-core server (8cores, 32GB RAM) with the setting local[2]. That is using two cores for parallelization. However, I found that the performance is pretty bad. It almost takes two hours to complete. I am using KryoSerializer. I guess it might be caused by Serializer. How to solve this problem?
val dataPoints = SparkContextManager.textFile(dataLocation)
.map(x => {
val delimited = x.split(",")
(delimited(ColumnIndices.HOME_ID_COLUMN).toLong,
delimited(ColumnIndices.USAGE_READING_COLUMN).toDouble)
})
def process(step: Int): Array[(Long, List[Double])] = {
val resultRDD = new PairRDDFunctions(dataPoints.map(x =>(x._1, List[Double](x._2))))
resultRDD.reduceByKey((x, y) => x++y).collect()
}
The output will be:
1, [1, 3, 13, 21, ..., 111] // The size of list is about 4000
2, [24,34,65, 24, ..., 245]
....
Upvotes: 2
Views: 10619
Reputation: 181
For the code line:
resultRDD.reduceByKey((x, y) => x++y).collect()
reduceByKey(func, [numTasks]) could receive a second parameter
reference the official doc:https://spark.apache.org/docs/latest/programming-guide.html#parallelized-collections
When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
specific the numTasks to re-partition before shuffle to enhance shuffle MR performance.
the @Josh Rosen is wrong. using reduceByKey may better than groupByKey,pls reference the doc
When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
Upvotes: 3
Reputation: 31553
Josh explains how to make it faster (and more readable) by refactoring to:
def process(): Array[(Long, List[Double])] = dataPoints.groupByKey().collect()
Note, you don't use step
, so don't pass it. Now the reason why it should be faster is because your calling x ++ y
on Seq
which is O(x.size)
, but the reduceByKey
function will under the hood be concatenating on Iterator
which is O(1)
. This means if you have more partitions than you have keys, you'll be performing unnecessary Seq
concatenations.
If you have 8 cores, aim for 16 - 32 partitions (2 - 4 tasks per CPU).
Now another thing that could be optimized is your string splitting, if the indexes of the delimeted
you are interested in are << the total number of ,
s then you are unnecessarily splitting the tail end of the string. For example, if you want indexes 0 and 4, and your string has 100 fields, then you'll get a bit of a speed up by not processing the tail 95 fields (this speed up will be more noticeable for cached RDDs).
Upvotes: 1
Reputation: 13841
It looks like you're trying to write a Spark job that groups together values that are associated with the same key. PairRDDFunctions has a groupByKey
operation that does this. Spark's implementation of groupByKey
takes advantage of several performance optimizations to create fewer temporary objects and shuffle less data over the network (since each value won't be wrapped in a List
).
If you import Spark's implicit conversions, using
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
then you won't need to manually wrap your mapped RDD with PairRDDFunctions
in order to access functions like groupByKey
. This doesn't have a performance impact and makes large Spark programs significantly easier to read.
Using groupByKey
, I think your process
function can be rewritten as
def process(step: Int): Array[(Long, Seq[Double])] = {
dataPoints.groupByKey().collect()
}
I'd also consider increasing the degree of parallelism: both groupByKey
and reduceByKey
take an optional numTasks
argument that controls the number of reducers; by default, Spark uses only 8 parallel tasks for groupByKey
and reduceByKey
. This is described in the Spark Scala Programming Guide, as well as in the Scaladoc.
Upvotes: 9