Reputation: 539
I'm losing my mind. It took me 10 hours, but it's still not working!!!
I use flink session window join two streams.
with EventTime, and using session window join two streams one same value.
code as follow
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
// current time
long currentTime = System.currentTimeMillis();
// topic of datahub which start consume time milliseconds
final long START_TIME_MS = 0L;
// session window gap time
final Time WIN_GAP_TIME = Time.seconds(10L);
// source function maxOutOfOrderness milliseconds
final Time maxOutOfOrderness = Time.milliseconds(5L);
// init source function
DatahubSourceFunction oldTableASourceFun =
new DatahubSourceFunction(
endPoint,
projectName,
topicOldTableA,
accessId,
accessKey,
// currentTime,
START_TIME_MS,
Long.MAX_VALUE,
20,
1000,
20);
DatahubSourceFunction tableBSourceFun =
new DatahubSourceFunction(
endPoint,
projectName,
topicTableB,
accessId,
accessKey,
START_TIME_MS,
Long.MAX_VALUE,
20,
1000,
20);
// init source
DataStream<OldTableA> oldTableADataStream =
env.addSource(oldTableASourceFun)
.flatMap(
new FlatMapFunction<List<RecordEntry>, OldTableA>() {
@Override
public void flatMap(List<RecordEntry> list, Collector<OldTableA> out)
throws Exception {
for (RecordEntry recordEntry : list) {
out.collect(CommonUtils.convertToOldTableA(recordEntry));
}
}
})
.uid("oldTableADataSource")
.setParallelism(1)
.returns(new TypeHint<OldTableA>() {})
.assignTimestampsAndWatermarks(new ExtractorWM<OldTableA>(maxOutOfOrderness));
DataStream<TableB> tableBDataStream =
env.addSource(tableBSourceFun)
.flatMap(
new FlatMapFunction<List<RecordEntry>, TableB>() {
@Override
public void flatMap(java.util.List<RecordEntry> list, Collector<TableB> out)
throws Exception {
for (RecordEntry recordEntry : list) {
out.collect(CommonUtils.convertToTableB(recordEntry));
}
}
})
.uid("tableBDataSource")
.setParallelism(1)
.returns(new TypeHint<TableB>() {})
.assignTimestampsAndWatermarks(new ExtractorWM<TableB>(maxOutOfOrderness));
and ExtractorWM
code as follow
public class ExtractorWM<T extends CommonPOJO> extends BoundedOutOfOrdernessTimestampExtractor<T> {
public ExtractorWM(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(T element) {
/* it's ok System.out.println(element +"-"+ CommonUtils.getSimpleDateFormat().format(getCurrentWatermark().getTimestamp()));*/
return System.currentTimeMillis();
}
}
I tested the above code to output correctly, watermark and event is right
// oldTableADataStream event and watermark'ts
OldTableA{PA1=1, a2='a20', **fa3=20**, fa4=30} 1596092987721
OldTableA{PA1=2, a2='a20', **fa3=20**, fa4=31} 1596092987721
OldTableA{PA1=3, a2='a20', **fa3=20**, fa4=32} 1596092987721
OldTableA{PA1=4, a2='a20', **fa3=20**, fa4=33} 1596092987721
OldTableA{PA1=5, a2='a20', **fa3=20**, fa4=34} 1596092987722
//
tableBDataStream event and watermark'ts
TableB{**PB1=20**, B2='b20', B3='b30'} 1596092987721
TableB{PB1=21, B2='b21', B3='b31'} 1596092987721
TableB{PB1=22, B2='b22', B3='b32'} 1596092987721
TableB{PB1=23, B2='b23', B3='b33'} 1596092987722
TableB{PB1=24, B2='b24', B3='b34'} 1596092987722
I except result as
1 a20 20 b20 b30 30
2 a20 20 b20 b30 31
3 a20 20 b20 b30 32
4 a20 20 b20 b30 33
5 a20 20 b20 b30 34
6 a20 20 b20 b30 35
but join operator is not work
DataStream<NewTableA> join1 =
oldTableADataStream
.join(tableBDataStream)
.where(t1 -> t1.getFa3()) // print element is out right
.equalTo(t2 -> t2.getPb1()) // print element is out right
.window(EventTimeSessionWindows.withGap(WIN_GAP_TIME))
// .trigger(new TestTrigger())
// .allowedLateness(Time.seconds(2))
.apply(new oldTableAJoinTableBFunc()); // test join method not work, join method not be call
join1.print(); // it's nothing
and oldTableAJoinTableBFunc
code as follow
public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {
@Override
public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {
// not working
// I breakpoint join code line and debug ,but never trigger
System.out.println(
oldTableA
+ " - "
+ tableB
+ " - "
+ CommonUtils.getSimpleDateFormat().format(System.currentTimeMillis()));
NewTableA newTableA = new NewTableA();
newTableA.setPA1(oldTableA.getPa1());
newTableA.setA2(oldTableA.getA2());
newTableA.setFA3(oldTableA.getFa3());
newTableA.setFA4(oldTableA.getFa4());
newTableA.setB2(tableB.getB2());
newTableA.setB3(tableB.getB3());
return newTableA;
}
}
The problem I see is that apply(new oldTableAJoinTableBFunc())
, I breakpoint join method and debug , but never be breaked, then join method not be call. I studied source code as join method be called when pair is happen, then I print t1.getFa3()
and t2.getPb1()
as least one line 20 is equal, why join not be called?
Upvotes: 0
Views: 1004
Reputation: 43439
Your approach to handling time and watermarking is why this isn't working. For a more in-depth introduction to the topic, see the section of the Flink training course that covers event time and watermarking.
The motivation behind event time processing is to be able to implement consistent, deterministic streaming analytics despite events arriving out-of-order and being processed at some unknown rate. This depends on the events carrying timestamps, and on those timestamps being extracted from the events by a timestamp assigner. Your timestamp assigner is returning System.currentTimeMillis
, which effectively disables all of the event time machinery. Moreover, because you are using System.currentTimeMillis as the source of timing information your events can not be out-of-order, yet you are specifying a watermarking delay of 5 msec.
I doubt your job even runs for 5 msec, so it may not be generating any watermarks at all. Plus, it will actually take 200 msec before Flink sends the first watermark (see below).
For a session to end, there will have to be a 10 second interval during which no events are processed. (If you switch to using proper event time timestamps, then those timestamps will need a gap of 10+ seconds, but since you are using System.currentTimeMillis as the source of timing info, your job needs a gap of 10 real-time seconds to close a session.)
A BoundedOutOfOrdernessTimestampExtractor
generates watermarks by observing the timestamps in the stream, and every 200 msec (by default) it injects a Watermark into the stream whose value is computed by taking the largest timestamp seen so far in the event stream, and subtracting from it the bounded delay (5 msec). A 10 second long event time session window will only close when a Watermark arrives that is at least 10 seconds later than the timestamp of the latest event currently in the session. For such a watermark to be created, a suitable event with a sufficiently large timestamp has to been processed.
Upvotes: 2
Reputation: 539
I found the answer!
the reason is DatahubSourceFunction
which consume datahub(Aliyun service like Kafka) topic and emit into flink. but when no record be consumed then the timestamp of watermarks is over and over again.
I use BoundedOutOfOrdernessTimestampExtractor
generate watermark which feature is need extract timestamp from event to watermark, then the watermark is generated when there's an event, and the timestamp of the watermark is generated to the same value when there's no event.
when DatahubSourceFunction
consume the last recorder and emit the last event, then no more event be emitted.
then the timestamp of the last event same as the timestamp of the last watermark(System.currentTimeMillis()).then session window never ends because all timestamp of watermarks less than window GAP + the timestamp of the last event.
the session window no end then join function not be called.
Upvotes: 0