Reputation: 1283
The session window in Flink is not working as expected on prod env (same logic works on local env). The idea is to emit the count of 'sample_event_two' for a specific user Id & record id incase if there is at least one event of type 'sample_event_one' for the same user Id & record id. ProcessingTimeSessionWindows with session gap of 30 mins is used here and ProcessWindowFunction has the below logic (I am doing a keyby user Id and record Id fields before setting the window size),
public void process(
String s,
Context context,
Iterable<SampleEvent> sampleEvents,
Collector<EnrichedSampleEvent> collector)
throws Exception {
EnrichedSampleEvent event = null;
boolean isSampleEventOnePresent = false;
int count = 0;
for (SampleEvent sampleEvent : sampleEvents) {
if (sampleEvent.getEventName().equals("sample_event_one_name")) {
Logger.info("Received sample_event_one for userId: {}");
isSampleEventOnePresent = true;
} else {
// Calculate the count for sample_event_two
count++;
if (Objects.isNull(event)) {
event = new EnrichedSampleEvent();
event.setUserId(sampleEvent.getUserId());
}
}
}
if (isSampleEventOnePresent && Objects.nonNull(event)) {
Logger.info(
"Created EnrichedSampleEvent for userId: {} with count: {}",
event.getUserId(),
event.getCount());
collector.collect(event);
} else if (Objects.nonNull(event)) {
Logger.info(
"No sampleOneEvent event found sampleTwoEvent with userId: {}, count: {}",
event.getUserId(),
count);
}
}
Though there is sample_event_one present in the collection (confirmed by verifying if the log message "Received sample_event_one" was present) and the count is calculated correctly, I don't see any output event getting created. Instead of EnrichedSampleEvent being emitted, I see log message "No sampleOneEvent event found sampleTwoEvent with userID: "123, count: 5". Can someone help me fix this?
Upvotes: 0
Views: 72
Reputation: 43499
Your ProcessWindowFunction
will be called for each key individually. Since the key is a combination of user id and record id, it's not enough to know that "Received sample_event_one" appears in the logs for the same user. Even though it was the same user, it might have had a different record id.
Upvotes: 1