Reputation: 73
I have this code which gets RDD from Cassandra then extracts first and last row for each key and subtracts them.
val rdd = sc.cassandraTable("keyspace","table")
.select("column1", "column2", "column3", "column4","column5")
.as((i:String, p:String, e:String, c:Double, a:java.util.Date) => ((i), (c, a, p, e)))
.groupByKey.mapValues(v => v.toList)
.cache
val finalValues = rdd.mapValues(v => v.head)
val initialValues = rdd.mapValues(v => v.last)
val valuesCombined = finalValues.join(initialValues)
val results = valuesCombined.map(v => (v._2._1._1 - v._2._2._1))
Is it good performance-wise or is there a better solution? I am not sure about caching the whole dataset in memory.
Upvotes: 2
Views: 1754
Reputation: 330423
groupByKey
shuffles the data and the order of grouped value is no longer guaranteed. It is also rather expensive.
If you really want to operate on RDDs
not DataFrames
and ordering is based on the date you can use aggregateByKey
:
import scala.math.Ordering
type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)
val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))
def minMax(x: (Record, Record), y: (Record, Record)) = {
(RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}
rdd.aggregateByKey((maxRecord, minRecord))(
(acc, x) => minMax(acc, (x, x)),
minMax
)
With DataFrames
you can try something like this:
import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window
val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")
val w = Window
.partitionBy(partition.head, partition.tail: _*)
.orderBy(order.head, order.tail: _*)
// Lead / lag of row number to mark first / last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)
// Select value if first / last row in the group otherwise null
val firstColumns = columns.map(
c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))
// Add columns with first / last vals
val expanded = df.select(
partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)
// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)
There are some other ways you can solve this problem with DataFrames
including ordering over structs
and DataSet
API. See my answer to SPARK DataFrame: select the first row of each group
Upvotes: 2
Reputation: 37852
First - I'm assuming the all
variable refers to the one named rdd
? After creating it, you don't need to use join (which is costly performance-wise), you can simply map each element directly to the result you need:
val results = all.mapValues(v => v.head - v.last).values
Now - since we've only performed a single action on the RDD, we can also get rid of the cache()
.
Upvotes: 1