user1441849
user1441849

Reputation: 245

Spark: groupBy taking lot of time

In my application when taking perfromance numbers, groupby is eating away lot of time.

My RDD is of below strcuture:

JavaPairRDD<CustomTuple, Map<String, Double>>

CustomTuple: This object contains information about the current row in RDD like which week, month, city, etc.

public class CustomTuple implements Serializable{

private Map hierarchyMap = null;
private Map granularMap  = null;
private String timePeriod = null;
private String sourceKey  = null;
}

Map

This map contains the statistical data about that row like how much investment, how many GRPs, etc.

<"Inv", 20>

<"GRP", 30>

I was executing below DAG on this RDD

  1. apply filter on this RDD and scope out relevant rows : Filter
  2. apply filter on this RDD and scope out relevant rows : Filter
  3. Join the RDDs: Join
  4. apply map phase to compute investment: Map
  5. apply GroupBy phase to group the data according to the desired view: GroupBy
  6. apply a map phase to aggregate the data as per the grouping achieved in above step (say view data across timeperiod) and also create new objects based on the resultset desired to be collected: Map
  7. collect the result: Collect

So if user wants to view investment across time periods then below List is returned (this was achieved in step 4 above):

<timeperiod1, value> 

When I checked time taken in operations, GroupBy was taking 90% of the time taken in executing the whole DAG.

IMO, we can replace GroupBy and subsequent Map operations by a sing reduce. But reduce will work on object of type JavaPairRDD>. So my reduce will be like T reduce(T,T,T) where T will be CustomTuple, Map.

Or maybe after step 3 in above DAG I run another map function that returns me an RDD of type for the metric that needs to be aggregated and then run a reduce.

Also, I am not sure how aggregate function works and will it be able to help me in this case.

Secondly, my application will receive request on varying keys. In my current RDD design each request would require me to repartition or re-group my RDD on this key. This means for each request grouping/re-partitioning would take 95% of my time to compute the job.

<"market1", 20>
<"market2", 30>

This is very discouraging as the current performance of application without Spark is 10 times better than performance with Spark.

Any insight is appreciated.

[EDIT]We also noticed that JOIN was taking a lot of time. Maybe thats why groupby was taking time.[EDIT]

TIA!

Upvotes: 1

Views: 8338

Answers (2)

YoYo
YoYo

Reputation: 9405

Shuffling is triggered by any change in the key of a [K,V] pair, or by a repartition() call. The partitioning is calculated based on the K (key) value. By default partitioning is calculated using the Hash value of your key, implemented by the hashCode() method. In your case your Key contains two Map instance variables. The default implementation of the hashCode() method will have to calculate the hashCode() of those maps as well, causing an iteration to happen over all it elements to in turn again calculate the hashCode() of those elements.

The solutions are:

  1. Do not include the Map instances in your Key. This seems highly unusual.
  2. Implement and override your own hashCode() that avoids going through the Map Instance variables.
  3. Possibly you can avoid using the Map objects completely. If it is something that is shared amongst multiple elements, you might need to consider using broadcast variables in spark. The overhead of serializing your Maps during shuffling might also be a big contributing factor.
  4. Avoid any shuffling, by tuning your hashing between two consecutive group-by's.
  5. Keep shuffling Node local, by choosing a Partitioner that will have an affinity of keeping partitions local during consecutive use.

Good reading on hashCode(), including a reference to quotes by Josh Bloch can be found in wiki.

Upvotes: 0

jlopezmat
jlopezmat

Reputation: 930

The Spark's documentation encourages you to avoid operations groupBy operations instead they suggest combineByKey or some of its derivated operation (reduceByKey or aggregateByKey). You have to use this operation in order to make an aggregation before and after the shuffle (in the Map's and in the Reduce's phase if we use Hadoop terminology) so your execution times will improve (i don't kwown if it will be 10 times better but it has to be better)

If i understand your processing i think that you can use a single combineByKey operation The following code's explanation is made for a scala code but you can translate to Java code without too many effort.

combineByKey have three arguments: combineByKey[C](createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

  • createCombiner: In this operation you create a new class in order to combine your data so you could aggregate your CustomTuple data into a new Class CustomTupleCombiner (i don't know if you want only make a sum or maybe you want to apply some process to this data but either option can be made in this operation)

  • mergeValue: In this operation you have to describe how a CustomTuple is sum to another CustumTupleCombiner(again i am presupposing a simple summarize operation). For example if you want sum the data by key, you will have in your CustumTupleCombiner class a Map so the operation should be something like: CustumTupleCombiner.sum(CustomTuple) that make CustumTupleCombiner.Map(CustomTuple.key)-> CustomTuple.Map(CustomTuple.key) + CustumTupleCombiner.value

  • mergeCombiners: In this operation you have to define how merge two Combiner class, CustumTupleCombiner in my example. So this will be something like CustumTupleCombiner1.merge(CustumTupleCombiner2) that will be something like CustumTupleCombiner1.Map.keys.foreach( k -> CustumTupleCombiner1.Map(k)+CustumTupleCombiner2.Map(k)) or something like that

The pated code is not proved (this will not even compile because i made it with vim) but i think that might work for your scenario.

I hope this will be usefull

Upvotes: 6

Related Questions