Paweł Szychiewicz
Paweł Szychiewicz

Reputation: 73

Subtracting values from first and last row from Cassandra in Spark

I have this code which gets RDD from Cassandra then extracts first and last row for each key and subtracts them.

val rdd = sc.cassandraTable("keyspace","table")
    .select("column1", "column2", "column3", "column4","column5")
    .as((i:String, p:String, e:String, c:Double, a:java.util.Date) => ((i), (c, a, p, e)))
    .groupByKey.mapValues(v => v.toList)
    .cache

val finalValues = rdd.mapValues(v => v.head)
val initialValues = rdd.mapValues(v => v.last)
val valuesCombined = finalValues.join(initialValues)

val results = valuesCombined.map(v => (v._2._1._1 - v._2._2._1))

Is it good performance-wise or is there a better solution? I am not sure about caching the whole dataset in memory.

Upvotes: 2

Views: 1754

Answers (2)

zero323
zero323

Reputation: 330423

groupByKey shuffles the data and the order of grouped value is no longer guaranteed. It is also rather expensive.

If you really want to operate on RDDs not DataFrames and ordering is based on the date you can use aggregateByKey:

import scala.math.Ordering

type Record = (String, String, String, Double, java.util.Date)
val RecordOrd = Ordering.by[Record, java.util.Date](_._5)

val minRecord = ("", "", "", 0.0, new java.util.Date(Long.MinValue))
val maxRecord = ("", "", "", 0.0, new java.util.Date(Long.MaxValue))

def minMax(x: (Record, Record), y: (Record, Record)) = {
  (RecordOrd.min(x._1, y._1), RecordOrd.max(x._2, y._2))
}

rdd.aggregateByKey((maxRecord, minRecord))(
  (acc, x) => minMax(acc, (x, x)),
  minMax
)

With DataFrames you can try something like this:

import org.apache.spark.sql.functions.{col, lag, lead, when, row_number, max}
import org.apache.spark.sql.expressions.Window

val partition = Seq("column1")
val order = Seq("column5")
val columns = Seq("column2", "column3", "column4","column5")

val w = Window
  .partitionBy(partition.head, partition.tail: _*)
  .orderBy(order.head, order.tail: _*)

// Lead / lag of row number to mark first / last row in the group
val rn_lag = lag(row_number.over(w), 1).over(w)
val rn_lead = lead(row_number.over(w), 1).over(w)

// Select value if first / last row in the group otherwise null
val firstColumns = columns.map(
  c => when(rn_lag.isNull, col(c)).alias(s"${c}_first"))
val lastColumns = columns.map(
  c => when(rn_lead.isNull, col(c)).alias(s"${c}_last"))

// Add columns with first / last vals
val expanded = df.select(
  partition.map(col(_)) ++ firstColumns ++ lastColumns: _*)

// Aggregate to drop nulls
val aggExprs = expanded.columns.diff(partition).map(c => max(c).alias(c))
expanded.groupBy(partition.map(col(_)): _*).agg(aggExprs.head, aggExprs.tail: _*)

There are some other ways you can solve this problem with DataFrames including ordering over structs and DataSet API. See my answer to SPARK DataFrame: select the first row of each group

Upvotes: 2

Tzach Zohar
Tzach Zohar

Reputation: 37852

First - I'm assuming the all variable refers to the one named rdd? After creating it, you don't need to use join (which is costly performance-wise), you can simply map each element directly to the result you need:

val results = all.mapValues(v => v.head - v.last).values

Now - since we've only performed a single action on the RDD, we can also get rid of the cache().

Upvotes: 1

Related Questions