Reputation: 177
I want to update my flink application 1.11.0 -> 1.13.2
My codebase stopped compiling due to depreciation of StreamExecutionEnvironment.setStreamTimeCharacteristic
,timeWindow()
Some code
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20,10000L))
env.getConfig.setGlobalJobParameters(parameterTool)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) #1
env.getConfig.setParallelism(parallelism)
val packetSource = env
.addSource(
new FlinkKinesisConsumer[String](s"packet-stream-name", new SimpleStringSchema, consumerConfig))
.map(json => read[MyCaseClass](json))
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness[MyCaseClass](Duration.ofSeconds(2))
.withTimestampAssigner(new PacketWatermarkGenerator))
.timeWindowAll(Time.seconds(2)) #2
.process(new OrderPacketWindowFunction)
val heartbeatEvents = packets
.timeWindow(Time.seconds(4)) #3
.process(new HeartbeatWindowFunction)
Since i have env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
config, looks like i need to do following
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
.timeWindowAll(Time.seconds(2))
-> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
.timeWindow(Time.seconds(4))
-> .window(TumblingEventTimeWindows.of(Time.seconds(4)))
Upvotes: 1
Views: 657
Reputation: 177
remove the redundant config - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
replace .timeWindowAll(Time.seconds(2))
-> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
replace .timeWindow(Time.seconds(4))
-> .window(TumblingEventTimeWindows.of(Time.seconds(4)))
Upvotes: 1