Tom
Tom

Reputation: 6342

Don't understand interval join in Flink

From Flink's official doc:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/joining.html#interval-join

The example code is:

import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });

From the above code, I would like know how to specify the starting time(eg, from the beginning of today) from which to perform this interval join(the data before the starting time will not take into account).

eg, I have run the program for 3 days, I don't want to perform this join for all the data for 3 days, I just want to perform the join for the data generated today.

Upvotes: 0

Views: 1654

Answers (1)

Dominik Wosiński
Dominik Wosiński

Reputation: 3874

I don't think it works like You think it does.

The actual interval is calculated based on the actual timestamps of orangeStream in this case, so You are not really providing the interval of the data You want to take in account, but rather this is something like window which specifies which elements will be joined with the given element of the orange stream.

So, for the window described above if You have orange element with timestamp 5, then it will be joined with elements that have timestamps from 3 to 6.

I really don't think You could use it to perform joins only with some part of the data, the only think I can think of is to simply filter the data using the timestamps and filter out all elements that have been generated earlier.

Upvotes: 2

Related Questions