kursk.ye
kursk.ye

Reputation: 539

Flink event session window why not emit

I'm losing my mind. It took me 10 hours, but it's still not working!!!

I use flink session window join two streams.

with EventTime, and using session window join two streams one same value.

code as follow

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.setParallelism(1);

    // current time
    long currentTime = System.currentTimeMillis();
    // topic of datahub which start consume time milliseconds
    final long START_TIME_MS = 0L;
    // session window gap time
    final Time WIN_GAP_TIME = Time.seconds(10L);
    // source function maxOutOfOrderness milliseconds
    final Time maxOutOfOrderness = Time.milliseconds(5L);

    // init source function
    DatahubSourceFunction oldTableASourceFun =
        new DatahubSourceFunction(
            endPoint,
            projectName,
            topicOldTableA,
            accessId,
            accessKey,
            // currentTime,
            START_TIME_MS,
            Long.MAX_VALUE,
            20,
            1000,
            20);

    DatahubSourceFunction tableBSourceFun =
        new DatahubSourceFunction(
            endPoint,
            projectName,
            topicTableB,
            accessId,
            accessKey,
            START_TIME_MS,
            Long.MAX_VALUE,
            20,
            1000,
            20);

// init source
    DataStream<OldTableA> oldTableADataStream =
        env.addSource(oldTableASourceFun)
            .flatMap(
                new FlatMapFunction<List<RecordEntry>, OldTableA>() {
                  @Override
                  public void flatMap(List<RecordEntry> list, Collector<OldTableA> out)
                      throws Exception {
                    for (RecordEntry recordEntry : list) {
                      out.collect(CommonUtils.convertToOldTableA(recordEntry));
                    }
                  }
                })
            .uid("oldTableADataSource")
            .setParallelism(1)
            .returns(new TypeHint<OldTableA>() {})
            .assignTimestampsAndWatermarks(new ExtractorWM<OldTableA>(maxOutOfOrderness));

    DataStream<TableB> tableBDataStream =
        env.addSource(tableBSourceFun)
            .flatMap(
                new FlatMapFunction<List<RecordEntry>, TableB>() {
                  @Override
                  public void flatMap(java.util.List<RecordEntry> list, Collector<TableB> out)
                      throws Exception {
                    for (RecordEntry recordEntry : list) {
                      out.collect(CommonUtils.convertToTableB(recordEntry));
                    }
                  }
                })
            .uid("tableBDataSource")
            .setParallelism(1)
            .returns(new TypeHint<TableB>() {})
            .assignTimestampsAndWatermarks(new ExtractorWM<TableB>(maxOutOfOrderness));

and ExtractorWM code as follow

public class ExtractorWM<T extends CommonPOJO> extends BoundedOutOfOrdernessTimestampExtractor<T> {

  public ExtractorWM(Time maxOutOfOrderness) {
    super(maxOutOfOrderness);
  }

  @Override
  public long extractTimestamp(T element) {
    /* it's ok    System.out.println(element +"-"+    CommonUtils.getSimpleDateFormat().format(getCurrentWatermark().getTimestamp()));*/ 
    return System.currentTimeMillis();
  }
}

I tested the above code to output correctly, watermark and event is right

// oldTableADataStream event and watermark'ts
OldTableA{PA1=1, a2='a20', **fa3=20**, fa4=30} 1596092987721
OldTableA{PA1=2, a2='a20', **fa3=20**, fa4=31} 1596092987721
OldTableA{PA1=3, a2='a20', **fa3=20**, fa4=32} 1596092987721
OldTableA{PA1=4, a2='a20', **fa3=20**, fa4=33} 1596092987721
OldTableA{PA1=5, a2='a20', **fa3=20**, fa4=34} 1596092987722

//
tableBDataStream event and watermark'ts
TableB{**PB1=20**, B2='b20', B3='b30'} 1596092987721
TableB{PB1=21, B2='b21', B3='b31'} 1596092987721
TableB{PB1=22, B2='b22', B3='b32'} 1596092987721
TableB{PB1=23, B2='b23', B3='b33'} 1596092987722
TableB{PB1=24, B2='b24', B3='b34'} 1596092987722

I except result as

1   a20 20  b20 b30 30
2   a20 20  b20 b30 31
3   a20 20  b20 b30 32
4   a20 20  b20 b30 33
5   a20 20  b20 b30 34
6   a20 20  b20 b30 35

but join operator is not work

    DataStream<NewTableA> join1 =
        oldTableADataStream
            .join(tableBDataStream)
            .where(t1 -> t1.getFa3())  // print element is out right 
            .equalTo(t2 -> t2.getPb1())  // print element is out right
            .window(EventTimeSessionWindows.withGap(WIN_GAP_TIME))
             //   .trigger(new TestTrigger())
            // .allowedLateness(Time.seconds(2))
            .apply(new oldTableAJoinTableBFunc()); // test join method not work, join method not be call 

    join1.print();  // it's nothing

and oldTableAJoinTableBFunc code as follow

public class oldTableAJoinTableBFunc implements JoinFunction<OldTableA, TableB, NewTableA> {


  @Override
  public NewTableA join(OldTableA oldTableA, TableB tableB) throws Exception {

    // not working
    // I breakpoint join code line and debug ,but never trigger
    System.out.println(
        oldTableA
            + " - "
            + tableB
            + " - "
            + CommonUtils.getSimpleDateFormat().format(System.currentTimeMillis())); 

    NewTableA newTableA = new NewTableA();

    newTableA.setPA1(oldTableA.getPa1());
    newTableA.setA2(oldTableA.getA2());
    newTableA.setFA3(oldTableA.getFa3());
    newTableA.setFA4(oldTableA.getFa4());
    newTableA.setB2(tableB.getB2());
    newTableA.setB3(tableB.getB3());

    return newTableA;
  }
}

The problem I see is that apply(new oldTableAJoinTableBFunc()) , I breakpoint join method and debug , but never be breaked, then join method not be call. I studied source code as join method be called when pair is happen, then I print t1.getFa3() and t2.getPb1() as least one line 20 is equal, why join not be called?

Upvotes: 0

Views: 1004

Answers (2)

David Anderson
David Anderson

Reputation: 43439

Your approach to handling time and watermarking is why this isn't working. For a more in-depth introduction to the topic, see the section of the Flink training course that covers event time and watermarking.

The motivation behind event time processing is to be able to implement consistent, deterministic streaming analytics despite events arriving out-of-order and being processed at some unknown rate. This depends on the events carrying timestamps, and on those timestamps being extracted from the events by a timestamp assigner. Your timestamp assigner is returning System.currentTimeMillis, which effectively disables all of the event time machinery. Moreover, because you are using System.currentTimeMillis as the source of timing information your events can not be out-of-order, yet you are specifying a watermarking delay of 5 msec.

I doubt your job even runs for 5 msec, so it may not be generating any watermarks at all. Plus, it will actually take 200 msec before Flink sends the first watermark (see below).

For a session to end, there will have to be a 10 second interval during which no events are processed. (If you switch to using proper event time timestamps, then those timestamps will need a gap of 10+ seconds, but since you are using System.currentTimeMillis as the source of timing info, your job needs a gap of 10 real-time seconds to close a session.)

A BoundedOutOfOrdernessTimestampExtractor generates watermarks by observing the timestamps in the stream, and every 200 msec (by default) it injects a Watermark into the stream whose value is computed by taking the largest timestamp seen so far in the event stream, and subtracting from it the bounded delay (5 msec). A 10 second long event time session window will only close when a Watermark arrives that is at least 10 seconds later than the timestamp of the latest event currently in the session. For such a watermark to be created, a suitable event with a sufficiently large timestamp has to been processed.

Upvotes: 2

kursk.ye
kursk.ye

Reputation: 539

I found the answer!

the reason is DatahubSourceFunction which consume datahub(Aliyun service like Kafka) topic and emit into flink. but when no record be consumed then the timestamp of watermarks is over and over again.

I use BoundedOutOfOrdernessTimestampExtractor generate watermark which feature is need extract timestamp from event to watermark, then the watermark is generated when there's an event, and the timestamp of the watermark is generated to the same value when there's no event.

when DatahubSourceFunction consume the last recorder and emit the last event, then no more event be emitted.

then the timestamp of the last event same as the timestamp of the last watermark(System.currentTimeMillis()).then session window never ends because all timestamp of watermarks less than window GAP + the timestamp of the last event.

the session window no end then join function not be called.

Upvotes: 0

Related Questions