Reputation: 661
I made processing data code in scala&spark and somehow it's so slow. I guess it's because of 'ExternalSort'. As you can see my code below, There is no reason to sort data but spark did.
I have more than 6,000,000 rows in RDD and try to cluster data with column name 'ID' (which are less than 20 types, so each ID group would be more than 300,000 rows)
I know It's pretty large data but other process were not slow. Any idea of this?
val ListByID = allData.map { x => (x.getAs[String]("ID"), List(x)) }.reduceByKey { (a: List[Row], b: List[Row]) => List(a, b).flatten }
val goalData = ListByID.map({ rowList =>
val list = rowList._2
val ID = rowList._1
val SD = list.head.getAs[String]("SD")
val ANOTEHR_ID_CNT = list.map{ row=> row.getAs[String]("ANOTHER_ID")}.distinct.length
Row(
ID, ID, list.length,
list.count { row => row.getAs[Int]("FLAGA")==1 },
list.count { row => row.getAs[Int]("FLAGB")==1 },
SD, ANOTEHR_ID_CNT)
})
Upvotes: 0
Views: 1079
Reputation: 330263
Following part:
allData.map{...}.reduceByKey{ (a: List[Row], b: List[Row]) => List(a, b).flatten }
is just a significantly more expensive implementation of groupByKey
. It not only puts more pressure on GC by applying map-side aggregations but may also create huge number of temporary objects. If single group doesn't fit into memory then out-of-memory error is inevitable.
Next you group data and drag all the fields when all you do later is counting. It could be easily handled with simple aggregation.
ID
and ANOTHER_ID
counting FLAGA=1
, FLAGB=1
and keeping single SD
ID
, sum FLAGA=1
, FLAGB=1
, 1
(distinct ANOTHER_ID
), keep arbitrary SD
.Finally if you start with DataFrame
why move data to less efficient format at all? With pseudocode:
df.groupBy("ID").agg(
count($"*"),
count(when($"FLAGA" === 1, 1)),
count(when($"FLAGB" === 1, 1))
countDistinct("ANOTHER_ID"),
first("SD")
)
Upvotes: 1