Tom
Tom

Reputation: 6342

What's the relationship between key and Window instance in KeyedStream#timeWindow#process

For KeyedStream#timeWindow#process, I am wonderring whether one window instance will only contain the same key, and different keys will use different window instances.

From the output of the following application, i see that one window instance will only contain the same key, and different keys will use different windows.

But I want to ask and confirm, thanks!

import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

import scala.util.Random

class KeyByAndWindowAndProcessTestSource extends RichParallelSourceFunction[Int] {
  override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {

    while (true) {
      val i = new Random().nextInt(30)
      ctx.collect(i)
      ctx.collect(i)
      ctx.collect(i)

      Thread.sleep(1000)
    }

  }

  override def cancel(): Unit = {

  }
}

The applications is:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._


object KeyByAndWindowTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
    val ds: DataStream[Int] = env.addSource(new KeyByAndWindowAndProcessTestSource)
    val ds2 = ds.keyBy(i => i).timeWindow(Time.seconds(4)).process(new MyProcessFunction())
    ds2.print()
    env.execute()
  }

}


class MyProcessFunction extends ProcessWindowFunction[Int, String, Int, TimeWindow] {


  override def process(
                        key: Int,
                        ctx: Context,
                        vals: Iterable[Int],
                        out: Collector[String]): Unit = {

    println(new java.util.Date())

    println(s"key=${key}, vals = ${vals.mkString(",")}, hashCode=${System.identityHashCode(ctx.window)}")
  }
}

The output is:

Sat Sep 14 13:08:24 CST 2019
key=26, vals = 26,26,26, hashCode=838523304
Sat Sep 14 13:08:24 CST 2019
key=28, vals = 28,28,28, hashCode=472721641
Sat Sep 14 13:08:24 CST 2019
key=18, vals = 18,18,18,18,18,18, hashCode=1668151956

Upvotes: 0

Views: 381

Answers (2)

snntrable
snntrable

Reputation: 921

Original Answer:

I hope I get your question right...

ProcessWindowFunction#process will be invoked for each window and key once (or multiple times depending on the windows's trigger). Internally, window and key make up a composite partition key.

In terms of Java object instances, one instance of ProcessWindowFunction will deal with many keys. Specifically, there will be degree of parallelism many ProcessWindowFunctions.

Follow Up:

So I did not get it right :)

For every record, which is processed by the WindowOperator a new Window object is created, with the correct start/end time for the record.

This means that each invocation of ProcessWindowFunction#process will be passed a new Window object.

It is important to understand, that a Window in Flink is a very light object, which is just used as an additional part (the namespace) of the overall key. It does not hold any data and/or logic.

May I ask for the background of the question?

Upvotes: 0

Victor Wong
Victor Wong

Reputation: 161

Actually, with respect to ProcessingTimeWindow, a new window object is created for each element.

Here is the source code of TumblingProcessingTimeWindows#assignWindows:

    public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
        final long now = context.getCurrentProcessingTime();
        long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
        return Collections.singletonList(new TimeWindow(start, start + size));
    }

So System.identityHashCode will always return a unique hash code for different keys, and your test code does not prove anything.

Under the hood, elements are grouped by the key of elementKey + assignedWindow, so I think it's right to say "one window instance will only contain the same key, and different keys will use different window instances".

Upvotes: 2

Related Questions