FredT
FredT

Reputation: 11

Ho can I do a lazy match with Flink CEP

I want to use FlinkCEP to only do a 'lazy' match on a pattern. How can I do this? e.g. I have an input stream ACABCABCB and i want to match on A followedBy C to get only 3 matches and not 6 matches.

I've created the following example to illustrate my problem.

val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

case class MyEvent(id: Int, kind: String, value: String)
case class MyAggregatedEvent(id: Int, concatenatedValue: String)

val eventStream = env.fromElements(
  MyEvent(1, "A", "1"), MyEvent(1, "C", "1"),
  MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"),
  MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"),
  MyEvent(1, "B", "3")
)

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .next("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern)

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect {
  (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) =>
    val partA = pattern.get("pA").get
    val partC = pattern.get("pC").get

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value))
}
outNextStream.print()

env.execute("Experiment")

This gives me the following output :

MyAggregatedEvent(1,1=>1)

When I Change the pattern to :

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .followedBy("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

Then the following is printed :

MyAggregatedEvent(1,1=>1)
MyAggregatedEvent(1,1=>2)
MyAggregatedEvent(1,2=>2)
MyAggregatedEvent(1,1=>3)
MyAggregatedEvent(1,2=>3)
MyAggregatedEvent(1,3=>3)

How can I create a pattern that only match each event once, so that my output will be:

MyAggregatedEvent(1,1=>1)
MyAggregatedEvent(1,2=>2)
MyAggregatedEvent(1,3=>3)

Upvotes: 1

Views: 271

Answers (1)

Till Rohrmann
Till Rohrmann

Reputation: 13346

At the moment this is not supported by Flink's CEP library. The matching semantics cannot be controlled yet. I think it would be good to add a MATCH_ALL and a match MATCH_FIRST mode to begin with. The MATCH_FIRST discards all intermediate states once it has seen a fully matching sequence. This should cover your use case.

Upvotes: 1

Related Questions