user5228393
user5228393

Reputation:

How can I apply the same Pattern on two different Kafka Streams in Flink?

I have this Flink program below:

object WindowedWordCount {
  val configFactory = ConfigFactory.load()

  def main(args: Array[String]) = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val kafkaStream1 = env.addSource(new FlinkKafkaConsumer010[String](topic1, new SimpleStringSchema(), props))
      .assignTimestampsAndWatermarks(new TimestampExtractor)

    val kafkaStream2 = env.addSource(new FlinkKafkaConsumer010[String](topic2, new SimpleStringSchema(), props))
      .assignTimestampsAndWatermarks(new TimestampExtractor)

    val partitionedStream1 = kafkaStream1.keyBy(jsonString => {
      extractUserId(jsonString)
    })

    val partitionedStream2 = kafkaStream2.keyBy(jsonString => {
      extractUserId(jsonString)
    })

    //Is there a way to match the userId from partitionedStream1 and partitionedStream2 in this same pattern?
    val patternForMatchingUserId = Pattern.begin[String]("start")
        .where(stream1.getUserId() == stream2.getUserId()) //I want to do something like this

    //Is there a way to pass in partitionedStream1 and partitionedStream2 to this CEP.pattern function?
    val patternStream = CEP.pattern(partitionedStream1, patternForMatchingUserId)

    env.execute()
  }
}

In the flink program above, I have two streams named partitionedStream1 and partitionedStream2 which is keyedBy the userID.

I want to somehow compare the data from both streams in the patternForMatchingUserId pattern (similar to how I showed above). Is there a way to pass in two streams to the CEP.Pattern function?

Something like this:

val patternStream = CEP.pattern(partitionedStream1, partitionedStream2, patternForMatchingUserId)

Upvotes: 0

Views: 218

Answers (1)

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

There is no way you can pass two streams to CEP, but you can pass a combined stream.

If both streams have the same type/schema. You can just union them. I believe this solutions matches your case.

partitionedStream1.union(partitionedStream2).keyBy(...)

If they have different schema. You can transform them into one stream using some custom logic inside e.g. coFlatMap.

Upvotes: 1

Related Questions