Reputation:
I have this Flink program below:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val pattern = Pattern.begin[String]("start").where(jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account") == "iOS") //ERROR HERE
val patternStream = CEP.pattern(partitionedInput, pattern)
I am getting an error at the val pattern = ...
line saying Expected IterativeCondition[String], actual: (Nothing) => Unit
.
My dataStream
consists of JSON objects which I parse in the keyBy
to key by the account key inside the JSON object. Then I am trying to create a pattern but I am getting an error when creating the pattern.
Upvotes: 0
Views: 191
Reputation: 3422
Make sure you use the proper API. For scala you should import
import org.apache.flink.cep.scala.pattern.Pattern
rather than
import org.apache.flink.cep.pattern.Pattern
Upvotes: 1