Reputation: 484
I have two streams of events: one that emits an event to signal the start of an item's lifetime, and another stream that emits an event to signal the end of an item's lifetime. (The streams can be joined on an itemId
.)
How can I emit a new event in Flink for each itemId1
that only has an "end of lifetime" event, and not a corresponding beginning? (These beginning and ending events could be hours or days apart.)
Upvotes: 0
Views: 215
Reputation: 18987
You can implement the functionality with a stateful FlatMapFunction
on a KeyedStream
.
The following code snippet should do pretty much what you are looking for.
val stream1: DataStream[Event1] = ???
val stream2: DataStream[Event2] = ???
// map both streams to their ID and a isStart flag to have a common type
val ids1: DataStream[(Int, Boolean)] = stream1.map(e => (e.id, true) )
val ids2: DataStream[(Int, Boolean)] = stream2.map(e => (e.id, false) )
// union both streams
val ids = ids1.union(ids2)
// use a stateful FlatMapFunction to check
val onlyEOL: DataStream[Int] = ids
// organize stream by ID
.keyBy(_._1)
// use stateful FlatMapFunction to check that bol arrived before eol
.flatMapWithState {
(value: (Int, Boolean), state: Option[Boolean]) =>
if (value._2) {
// bol event -> emit nothing and set state to true
( List(), Some(true))
} else {
// eol event
if (state.isDefined && state.get) {
// bol was seen before -> emit nothing and remove state
( List(), None)
} else {
// bol was NOT seen before -> emit ID and remove state
( List(value._1), None)
}
}
}
Upvotes: 1