Reputation: 6342
For KeyedStream#timeWindow#process
, I am wonderring whether one window instance will only contain the same key, and different keys will use different window instances.
From the output of the following application, i see that one window instance will only contain the same key, and different keys will use different windows.
But I want to ask and confirm, thanks!
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import scala.util.Random
class KeyByAndWindowAndProcessTestSource extends RichParallelSourceFunction[Int] {
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
while (true) {
val i = new Random().nextInt(30)
ctx.collect(i)
ctx.collect(i)
ctx.collect(i)
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
}
}
The applications is:
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
object KeyByAndWindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val ds: DataStream[Int] = env.addSource(new KeyByAndWindowAndProcessTestSource)
val ds2 = ds.keyBy(i => i).timeWindow(Time.seconds(4)).process(new MyProcessFunction())
ds2.print()
env.execute()
}
}
class MyProcessFunction extends ProcessWindowFunction[Int, String, Int, TimeWindow] {
override def process(
key: Int,
ctx: Context,
vals: Iterable[Int],
out: Collector[String]): Unit = {
println(new java.util.Date())
println(s"key=${key}, vals = ${vals.mkString(",")}, hashCode=${System.identityHashCode(ctx.window)}")
}
}
The output is:
Sat Sep 14 13:08:24 CST 2019
key=26, vals = 26,26,26, hashCode=838523304
Sat Sep 14 13:08:24 CST 2019
key=28, vals = 28,28,28, hashCode=472721641
Sat Sep 14 13:08:24 CST 2019
key=18, vals = 18,18,18,18,18,18, hashCode=1668151956
Upvotes: 0
Views: 381
Reputation: 921
Original Answer:
I hope I get your question right...
ProcessWindowFunction#process
will be invoked for each window and key once (or multiple times depending on the windows's trigger). Internally, window and key make up a composite partition key.
In terms of Java object instances, one instance of ProcessWindowFunction
will deal with many keys. Specifically, there will be degree of parallelism many ProcessWindowFunctions.
Follow Up:
So I did not get it right :)
For every record, which is processed by the WindowOperator
a new Window
object is created, with the correct start/end time for the record.
This means that each invocation of ProcessWindowFunction#process
will be passed a new Window
object.
It is important to understand, that a Window
in Flink is a very light object, which is just used as an additional part (the namespace
) of the overall key. It does not hold any data and/or logic.
May I ask for the background of the question?
Upvotes: 0
Reputation: 161
Actually, with respect to ProcessingTimeWindow, a new window object is created for each element.
Here is the source code of TumblingProcessingTimeWindows#assignWindows
:
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}
So System.identityHashCode
will always return a unique hash code for different keys, and your test code does not prove anything.
Under the hood, elements are grouped by the key of elementKey + assignedWindow
, so I think it's right to say "one window instance will only contain the same key, and different keys will use different window instances".
Upvotes: 2