Vijay Muvva
Vijay Muvva

Reputation: 1083

Spark sessionization using data frames

I want to do clickstream sessionization on the spark data frame. Let's I have loaded the data frame which has events from multiple sessions with the following schema - enter image description here

And I want to aggregate(stitch) the sessions, like this - enter image description here

I have explored UDAF and Window functions but could not understand how I can use them for this specific use case. I know that partitioning the data by session id puts entire session data in a single partition but how do I aggregate them?

The idea is to aggregate all the events specific to each session as a single output record.

Upvotes: 0

Views: 886

Answers (1)

Emiliano Martinez
Emiliano Martinez

Reputation: 4133

You can use collect_set:

 def process(implicit spark: SparkSession) = {
      import spark._

      import org.apache.spark.sql.functions.{ concat, col, collect_set }

      val seq = Seq(Row(1, 1, "startTime=1549270909"), Row(1, 1, "endTime=1549270913"))

      val rdd = spark.sparkContext.parallelize(seq)

      val df1 = spark.createDataFrame(rdd, StructType(List(StructField("sessionId", IntegerType, false), StructField("userId", IntegerType, false), StructField("session", StringType, false))))

      df1.groupBy("sessionId").agg(collect_set("session"))
    }
  }

That gives you:

+---------+------------------------------------------+
|sessionId|collect_set(session)                      |
+---------+------------------------------------------+
|1        |[startTime=1549270909, endTime=1549270913]|
+---------+------------------------------------------+

as output.

If you need a more complex logic, it could be included in the following UDAF:

  class YourComplexLogicStrings extends UserDefinedAggregateFunction {
    override def inputSchema: StructType = StructType(StructField("input", StringType) :: Nil)

    override def bufferSchema: StructType = StructType(StructField("pair", StringType) :: Nil)

    override def dataType: DataType = StringType

    override def deterministic: Boolean = true

    override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = ""

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val b = buffer.getAs[String](0)
      val i = input.getAs[String](0)
      buffer(0) = { if(b.isEmpty) b + i else b + " + " + i }
    }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      val b1 = buffer1.getAs[String](0)
      val b2 = buffer2.getAs[String](0)
      if(!b1.isEmpty)
        buffer1(0) = (b1) ++ "," ++ (b2)
      else
        buffer1(0) = b2
    }

    override def evaluate(buffer: Row): Any = {
      val yourString = buffer.getAs[String](0)
      // Compute your logic and return another String
      yourString
    }
  }



def process0(implicit spark: SparkSession) = {

  import org.apache.spark.sql.functions.{ concat, col, collect_set }


  val agg0 = new YourComplexLogicStrings()

  val seq = Seq(Row(1, 1, "startTime=1549270909"), Row(1, 1, "endTime=1549270913"))

  val rdd = spark.sparkContext.parallelize(seq)

  val df1 = spark.createDataFrame(rdd, StructType(List(StructField("sessionId", IntegerType, false), StructField("userId", IntegerType, false), StructField("session", StringType, false))))

  df1.groupBy("sessionId").agg(agg0(col("session")))
}

It gives:

+---------+---------------------------------------+
|sessionId|yourcomplexlogicstrings(session)       |
+---------+---------------------------------------+
|1        |startTime=1549270909,endTime=1549270913|
+---------+---------------------------------------+

Note that you could include very complex logic using spark sql functions directly if you want to avoid UDAFs.

Upvotes: 3

Related Questions