Reputation: 2661
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create Groups based on more than one column in the Input Row. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. How do I specify more than one field in the 'groupByKey'?
There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing updateAcrossEvents _
or updateAcrossEvents(_,_,_,_,_)
instead of updateAcrossEvents
.
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
Another challenge: Compiler also complains about the my MyReport: Error:(41, 12) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Help in resolving these errors would be greatly appreciated. Thanks in advance.
withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
updateAcrossEvents:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[MyReport], oldState: GroupState[MyState]): MyState = {
var state: MyState = if (oldState.exists) oldState.get else MyState.getNewState(tuple3._1, tuple3._2, tuple3._3)
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
}
state
}
updateWithEvent:
def updateWithEvent(state: MyState, report: MyReport): MyState = {
state.someField1 = state.someField1 ++ Array(report.getSomeField1.longValue())
state.someField2 = state.someField2 ++ Array(report.getSomeField2.longValue())
state
}
Upvotes: 4
Views: 4121
Reputation: 1080
You could form a tuple of keys - check this code:
withEventTime
.as[MyReport]
.groupByKey(row => (row.getKeys.getKey1,row.getKeys.getKey2))
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
Now you get one unique group for (getKey1, getKey2) combination. You may have to change your update function accordingly.
For the second question:
yes, spark only supports case class and primitive types by default.
To get rid of this error, make sure "MyReport" is a case class and import implicits before the above code using:
import <your_spark_session_variable>.implicits._
Upvotes: 2