Reputation: 2661
I've set the timeout duration to "2 minutes" as follows:
def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
oldState: GroupState[MyState]): OutputRow = {
println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)
if (oldState.hasTimedOut) {
println("@@@@@ oldState has timed out @@@@")
// Logic to Write OutputRow
OutputRow("some values here...")
} else {
for (input <- inputs) {
state = updateWithEvent(state, input)
oldState.update(state)
oldState.setTimeoutDuration("2 minutes")
}
OutputRow(null, null, null)
}
}
I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows...
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?
Upvotes: 2
Views: 234
Reputation: 2661
It seems it only works if input data is continuously flowing. I had stopped the input job because I had enough data but it seems timeouts work only if the data is continuously fed. Not sure why it's designed that way. Makes it a bit harder to write unit/integration tests BUT I am sure there's a reason why it's designed this way. Thanks.
Upvotes: 1