Reputation: 181
I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return inputStream -> inputStream
.processValues(() -> new HeartbeatProcessor()) // sends heartbeat events every second
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(
Duration.ofSeconds(5), // inactivityGap
Duration.ofSeconds(1) // gracePeriod
))
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()
));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)
Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
private FixedKeyProcessorContext<String, String> context;
@Override
public void init(FixedKeyProcessorContext<String, String> context) {
this.context = context;
context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
}
@Override
public void process(FixedKeyRecord<String, String> record) {
context.forward(record);
}
private void generateHeartbeat(long timestamp) {
if (context instanceof InternalProcessorContext internalContext) {
internalContext.setRecordContext(
new ProcessorRecordContext(
timestamp,
0L,
context.taskId().partition(),
"dummy-topic",
new RecordHeaders()
)
);
}
Record<String, String> record = new Record<>(
"heartbeat-" + timestamp, // different key every second
"heartbeat",
timestamp
);
context.forward(InternalFixedKeyRecordFactory.create(record));
}
}
I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.
Version: org.apache.kafka:kafka-streams:3.7.0
Upvotes: 3
Views: 26
Reputation: 62330
but windows aren't closing consistently.
Can you describe in more detail what you observe, and what you expect to get?
Btw: Given that you are using internals from Kafka Streams, there is no guarantee that this will actually work... You should strictly avoid using anything internal, as it might change at any point (syntactically, as well as semantically).
I believe that is also why you get a NPE to begin with -- using context.forward
inside a Punctuator
in a processValue
is not officially supported: https://issues.apache.org/jira/browse/KAFKA-16585
Upvotes: 0