DilTeam
DilTeam

Reputation: 2661

Spark Streaming with mapGroupsWithState

I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create Groups based on more than one column in the Input Row. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. How do I specify more than one field in the 'groupByKey'?

There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main Unapplied methods are only converted to functions when a function type is expected. You can make this conversion explicit by writing updateAcrossEvents _ or updateAcrossEvents(_,_,_,_,_) instead of updateAcrossEvents.       .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)

Another challenge: Compiler also complains about the my MyReport: Error:(41, 12) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.         

Help in resolving these errors would be greatly appreciated. Thanks in advance.

withEventTime
 .as[MyReport]
 .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

updateAcrossEvents:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {

 var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)

 for (input <- inputs) {
 state = updateWithEvent(state, input)
 oldState.update(state)
 }

 state
}

updateWithEvent:

def updateWithEvent(state: MyState, report: MyReport): MyState = {

 state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
 state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())

 state
}

Upvotes: 4

Views: 4121

Answers (1)

Suhas NM
Suhas NM

Reputation: 1080

You could form a tuple of keys - check this code:

withEventTime
 .as[MyReport]
 .groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2)) 
 .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
 .writeStream
 .queryName("test_query")
 .format("memory")
 .outputMode("update")
 .start()

Now you get one unique group for (getKey1, getKey2) combination. You may have to change your update function accordingly.

For the second question:

yes, spark only supports case class and primitive types by default.

To get rid of this error, make sure "MyReport" is a case class and import implicits before the above code using:

import <your_spark_session_variable>.implicits._

Upvotes: 2

Related Questions