Reputation:
I have this Flink program below:
object WindowedWordCount {
val configFactory = ConfigFactory.load()
def main(args: Array[String]) = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val kafkaStream1 = env.addSource(new FlinkKafkaConsumer010[String](topic1, new SimpleStringSchema(), props))
.assignTimestampsAndWatermarks(new TimestampExtractor)
val kafkaStream2 = env.addSource(new FlinkKafkaConsumer010[String](topic2, new SimpleStringSchema(), props))
.assignTimestampsAndWatermarks(new TimestampExtractor)
val partitionedStream1 = kafkaStream1.keyBy(jsonString => {
extractUserId(jsonString)
})
val partitionedStream2 = kafkaStream2.keyBy(jsonString => {
extractUserId(jsonString)
})
//Is there a way to match the userId from partitionedStream1 and partitionedStream2 in this same pattern?
val patternForMatchingUserId = Pattern.begin[String]("start")
.where(stream1.getUserId() == stream2.getUserId()) //I want to do something like this
//Is there a way to pass in partitionedStream1 and partitionedStream2 to this CEP.pattern function?
val patternStream = CEP.pattern(partitionedStream1, patternForMatchingUserId)
env.execute()
}
}
In the flink program above, I have two streams named partitionedStream1
and partitionedStream2
which is keyedBy
the userID.
I want to somehow compare the data from both streams in the patternForMatchingUserId
pattern (similar to how I showed above). Is there a way to pass in two streams to the CEP.Pattern
function?
Something like this:
val patternStream = CEP.pattern(partitionedStream1, partitionedStream2, patternForMatchingUserId)
Upvotes: 0
Views: 218
Reputation: 3422
There is no way you can pass two streams to CEP
, but you can pass a combined stream.
If both streams have the same type/schema. You can just union them. I believe this solutions matches your case.
partitionedStream1.union(partitionedStream2).keyBy(...)
If they have different schema. You can transform them into one stream using some custom logic inside e.g. coFlatMap
.
Upvotes: 1