Reputation: 2786
I have 2 streams, one data stream that just contains a flag to set ValueState
passing=true/false
, another control stream that adds a user to be notified to MapState
. When passing
changes from false
to true
, a notification is issued to the users present in MapState
who have not yet been notified.
Heres how the state transitions
Here is the KeyedCoProcessFunction
that handles this logic.
class TestKeyedCoProcessFunction extends KeyedCoProcessFunction[String, String, String, String] {
@transient private var notified: MapState[String, Boolean] = _ // user:was_notified
@transient private var passing: ValueState[java.lang.Boolean] = _ // issue notification when updated to passing=true
override def open(parameters: Configuration): Unit = {
val notifiedDescriptor = new MapStateDescriptor("notified", Types.STRING, Types.BOOLEAN)
notified = getRuntimeContext.getMapState(notifiedDescriptor)
val passingDescriptor = new ValueStateDescriptor("passing", Types.BOOLEAN)
passing = getRuntimeContext.getState(passingDescriptor)
if (passing.value() == null) {
passing.update(false)
}
}
def addUser(user: String): Boolean = {
if (notified.contains(user)) {
false
} else {
notified.put(user, false)
true
}
}
def setPassing(newPassing: String): Boolean = {
if (passing.value()) {
if (newPassing == "true") {
false
} else {
passing.update(false)
true
}
} else {
if (newPassing == "false") {
false
} else {
passing.update(true)
true
}
}
}
def notifyNotNotifiedUsers(collector: Collector[String]): Unit = {
val keys = notified.keys().iterator()
while (keys.hasNext) {
val user = keys.next()
val userNotified = notified.get(user)
if (!userNotified) {
collector.collect("Hey " + user + " passing=true")
notified.put(user, true)
}
}
}
def setNotifiedFalseAll(): Unit = {
val keys = notified.keys().iterator()
while (keys.hasNext) {
val user = keys.next()
val userNotified = notified.get(user)
if (userNotified) {
notified.put(user, false)
}
}
}
override def processElement1(user: String,
context: KeyedCoProcessFunction[String, String, String, String]#Context,
collector: Collector[String]): Unit = {
addUser(user)
if (passing.value()) {
notifyNotNotifiedUsers(collector)
}
}
override def processElement2(newPassing: String,
context: KeyedCoProcessFunction[String, String, String, String]#Context,
collector: Collector[String]): Unit = {
val modified = setPassing(newPassing)
if (passing.value()) {
notifyNotNotifiedUsers(collector)
} else {
if (modified) {
setNotifiedFalseAll()
}
}
}
}
Is it possible for a race condition to occur in Flink
where processElement1
and processElement2
are being executed simultaneously, for example
t+1 processElement2("true")
t+2 processElement2: setPassing("true")
t+3 processElement2: notifyNotNotifiedUsers() // starts iteration on MapState
t+4 processElement1("new_user")
t+5 processElement1: addUser(user) // adds user to MapState
t+6 processElement1: notifyNotNotifiedUsers() // starts another parallel iteration on MapState resulting in maybe missed/duplicate notification
Upvotes: 1
Views: 481
Reputation: 43707
There's no possibility of a race condition in any given instance of a KeyedCoProcessFunction
, or in any of Flink's user function interfaces, for that matter. processElement1
and processElement2
can not be executed concurrently. onTimer
is safe as well.
Upvotes: 4