user5228393
user5228393

Reputation:

How to use Pattern Matching where clause in Flink?

I have this Flink program below:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))

val partitionedInput = dataStream.keyBy(jsonString => {
  val jsonParser = new JsonParser()
  val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
  jsonObject.get("account")
})

val pattern = Pattern.begin[String]("start").where(jsonString => 
            val jsonParser = new JsonParser()
            val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
            jsonObject.get("account") == "iOS") //ERROR HERE

val patternStream = CEP.pattern(partitionedInput, pattern)

I am getting an error at the val pattern = ... line saying Expected IterativeCondition[String], actual: (Nothing) => Unit.

My dataStream consists of JSON objects which I parse in the keyBy to key by the account key inside the JSON object. Then I am trying to create a pattern but I am getting an error when creating the pattern.

Upvotes: 0

Views: 191

Answers (1)

Dawid Wysakowicz
Dawid Wysakowicz

Reputation: 3422

Make sure you use the proper API. For scala you should import

import org.apache.flink.cep.scala.pattern.Pattern

rather than

import org.apache.flink.cep.pattern.Pattern

Upvotes: 1

Related Questions