Sergey Postument
Sergey Postument

Reputation: 177

Flink 1.12 timeWindowAll/timeWindow operators deprecation

I want to update my flink application 1.11.0 -> 1.13.2 My codebase stopped compiling due to depreciation of StreamExecutionEnvironment.setStreamTimeCharacteristic,timeWindow()

Some code

    val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20,10000L))
      env.getConfig.setGlobalJobParameters(parameterTool)
      env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) #1
      env.getConfig.setParallelism(parallelism)

    val packetSource = env
      .addSource(
        new FlinkKinesisConsumer[String](s"packet-stream-name", new SimpleStringSchema, consumerConfig))
      .map(json => read[MyCaseClass](json))
      .assignTimestampsAndWatermarks(WatermarkStrategy
        .forBoundedOutOfOrderness[MyCaseClass](Duration.ofSeconds(2))
        .withTimestampAssigner(new PacketWatermarkGenerator))
      .timeWindowAll(Time.seconds(2))  #2
      .process(new OrderPacketWindowFunction)

    val heartbeatEvents = packets
      .timeWindow(Time.seconds(4)) #3
      .process(new HeartbeatWindowFunction)

Since i have env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) config, looks like i need to do following

  1. remove the redundant config - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  2. replace .timeWindowAll(Time.seconds(2)) -> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))
  3. .timeWindow(Time.seconds(4)) -> .window(TumblingEventTimeWindows.of(Time.seconds(4)))

Upvotes: 1

Views: 657

Answers (1)

Sergey Postument
Sergey Postument

Reputation: 177

  1. remove the redundant config - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  2. replace .timeWindowAll(Time.seconds(2)) -> .windowAll(TumblingEventTimeWindows.of(Time.seconds(2)))

  3. replace .timeWindow(Time.seconds(4)) -> .window(TumblingEventTimeWindows.of(Time.seconds(4)))

Upvotes: 1

Related Questions