Gaurav Kumar
Gaurav Kumar

Reputation: 1091

Apache Flink: Watermark does not progress with Broadcast stream

There is 1 high throughput Kafka stream defined as follows

val stream: DataStream[A] = flinkEnv
  .addSource(kafkaStreamSource)
  .assignTimestampsAndWatermarks(
     WatermarkStrategy
        .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
        .withIdleness(Duration.ofSeconds(5))
        .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
                element.lastUpdatedAt
              }
            }
        )
  )
  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
  .reduce(.......)

The watermark for the above window operator forwards properly.

The DataStream[A] from the above window operator need to be enriched with some info kept in some S3 files. The S3 files updates very rarely.

S3 file is read as a stream and then broadcasted to enrich the elements from DataStream[A].

 val textInputFormat = new TextInputFormat(new Path("s3 path...."))
 val enrichWithElements: BroadcastStream[EnrichWithElement] = flinkEnv.readFile(textInputFormat, "s3 path ...", FileProcessingMode.PROCESS_CONTINUOUSLY, 30)
    .map(s3Element => {
      EnrichWithElement(.....)
    })
    .broadcast(new MapStateDescriptor......)

The 2 streams are then connected to enrich all elements of type A with elements of type EnrichWithElement.

class EnrichedAProcess
    extends BroadcastProcessFunction[A,EnrichWithElement,EnrichedAElement] {
  override def processElement(
      value: A,
      ctx: Context,
      out: Collector[EnrichedAElement]): Unit = {
      .....
      out.collect(EnrichedAElement(....))
  }

  override def processBroadcastElement(
      value: EnrichWithElement,
      ctx: Context,
      out: Collector[EnrichedAElement]): Unit = {
      .........
  }
}
stream
  .connect(enrichWithElements)
  .process(new EnrichedAProcess)

The EnrichedAProcess has 2 inputs. One of them is constantly forwarding the watermark but the broadcasted stream does not have any time info or the watermark. This leads to the EnrichedAProcess's watermark not forwarding at all since one of its inputs is not passing in the watermark.

Is there a way to specify that EnrichedAProcess's watermark be only dependent on non broadcasted input.

Upvotes: 0

Views: 570

Answers (1)

David Anderson
David Anderson

Reputation: 43439

An operator with multiple input channels sets its own watermark to the minimum of the latest watermarks received from all active channels.

What you can do is to apply a WatermarkStrategy to the broadcast stream that always returns MAX_WATERMARK as its watermark. (You don't need to worry about assigning timestamps for that stream.)

Upvotes: 3

Related Questions