Reputation: 502
I read that reducebyKey
is a better option on large datasets to reduce shuffles of the data and, in this way, enhance performance.
I am trying to convert my use of groupByKey
. First it has to be converted to rdd as:
val linksNew = links.map(convertToRelationship)
.flatMap(bidirRelationship)
links is a dataset and dataset api doesn't have reduceByKey
. What is the equivalent of .groupByKey(_._1)
when using reduceByKey
?
val linksfinal = linksNew.rdd.reduceByKey(???)
Actual code:
val biLinks = links
.map(convertToRelationship)
.flatMap(bidirRelationship)
.groupByKey(_._1)
.reduceGroups((left, right) => combineBidirerRelationships(left,right))
.map(_._2._2)
Schema of the dataset, just before using the groupByKey(_._1)
:
Some of the actual data in the dataset:
Upvotes: 0
Views: 827
Reputation: 28392
Not sure if it is more efficient or not, however, it should be possible to convert to reduceByKey
since you perform a reduceGroups
directly after the groupByKey
. Short example using part of the provided code:
val biLinks = links
.map(convertToRelationship)
.flatMap(bidirRelationship)
.rdd
.map{row => (row.getAs[String](0), row.getAs[Relationship](1))} // See explanation below
.reduceByKey((left, right) => combineBidirerRelationships(left, right))
.map(_._2._2)
Depending on how the dataframe looks like after using .rdd
, an additional conversion could be required. When converting from a dataframe the resulting rdd will be a RDD[Row]
. However, for reduceByKey()
to work a tuple rdd of type RDD[(A,B)]
is required, where A
and B
are the types (they could also be tuples themselves).
Short example of how the rdd.map(...)
conversion can work with structs
:
case class Relationship(a: Long, b: Long)
val df = spark.createDataFrame(Seq((1, Relationship(3L, 2L)), (2, Relationship(20L, 7L)))).toDF()
val rdd = df.rdd.map{ row => (row.getAs[String](0), row.getAs[Relationship](1))}
This gives the required tuple rdd type, here RDD[(String, Relationship)]
.
Upvotes: 1
Reputation: 1
I read that reducebyKey is a better option on large datasets to reduce shuffle and or shuffles on reduce side, and enhance performance.
It is not. You are confusing "old" RDD API where groupByKey
has different semantics.
In Dataset
API groupByKey
+ reduceGroups
uses similar execution model as reduceByKey
in the old API. In fact conversion to RDD uses less efficient shuffle mechanism and is very costly, so you're just making it worse.
Upvotes: 1