archura
archura

Reputation: 1283

Flink session window not working as expected

The session window in Flink is not working as expected on prod env (same logic works on local env). The idea is to emit the count of 'sample_event_two' for a specific user Id & record id incase if there is at least one event of type 'sample_event_one' for the same user Id & record id. ProcessingTimeSessionWindows with session gap of 30 mins is used here and ProcessWindowFunction has the below logic (I am doing a keyby user Id and record Id fields before setting the window size),

  public void process(
      String s,
      Context context,
      Iterable<SampleEvent> sampleEvents,
      Collector<EnrichedSampleEvent> collector)
      throws Exception {

    EnrichedSampleEvent event = null;
    boolean isSampleEventOnePresent = false;
    int count = 0;


    for (SampleEvent sampleEvent : sampleEvents) {

      if (sampleEvent.getEventName().equals("sample_event_one_name")) {

        Logger.info("Received sample_event_one for userId: {}");
        isSampleEventOnePresent = true;

      } else {
        // Calculate the count for sample_event_two
        count++;

        if (Objects.isNull(event)) {
          event = new EnrichedSampleEvent();
          event.setUserId(sampleEvent.getUserId());
        }
      }
    }

    if (isSampleEventOnePresent && Objects.nonNull(event)) {
      Logger.info(
          "Created EnrichedSampleEvent for userId: {} with count: {}",
          event.getUserId(),
          event.getCount());
      collector.collect(event);
    } else if (Objects.nonNull(event)) {
      Logger.info(
          "No sampleOneEvent event found sampleTwoEvent with userId: {}, count: {}",
          event.getUserId(),
          count);
    }
  }

Though there is sample_event_one present in the collection (confirmed by verifying if the log message "Received sample_event_one" was present) and the count is calculated correctly, I don't see any output event getting created. Instead of EnrichedSampleEvent being emitted, I see log message "No sampleOneEvent event found sampleTwoEvent with userID: "123, count: 5". Can someone help me fix this?

Upvotes: 0

Views: 72

Answers (1)

David Anderson
David Anderson

Reputation: 43499

Your ProcessWindowFunction will be called for each key individually. Since the key is a combination of user id and record id, it's not enough to know that "Received sample_event_one" appears in the logs for the same user. Even though it was the same user, it might have had a different record id.

Upvotes: 1

Related Questions