suno3
suno3

Reputation: 181

Kafka Streams: NPE in ProcessorRecordContext and Suppress issues with processValues()

I'm experiencing two issues with Kafka Streams' processValues() and suppress operations:

  1. Getting NPE when using processValues():
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return inputStream -> inputStream
        .processValues(() -> new HeartbeatProcessor())  // sends heartbeat events every second
        .groupByKey()
        .windowedBy(SessionWindows.ofInactivityGapAndGrace(
            Duration.ofSeconds(5),  // inactivityGap
            Duration.ofSeconds(1)   // gracePeriod
        ))
        .aggregate(...)
        .suppress(Suppressed.untilWindowCloses(
            Suppressed.BufferConfig.unbounded()
        ));
}
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "this.topic" is null
   at org.apache.kafka.streams.processor.internals.ProcessorRecordContext.serialize(ProcessorRecordContext.java:97)

Note: When using process() instead of processValues(), session windows close properly but I want to avoid repartitioning.

  1. Even after fixing the NPE by setting a topic name in ProcessorRecordContext, the suppressed session windows don't close properly:
public class HeartbeatProcessor implements FixedKeyProcessor<String, String, String> {
    private FixedKeyProcessorContext<String, String> context;

    @Override
    public void init(FixedKeyProcessorContext<String, String> context) {
        this.context = context;
        context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, this::generateHeartbeat);
    }

    @Override
    public void process(FixedKeyRecord<String, String> record) {
        context.forward(record);
    }

    private void generateHeartbeat(long timestamp) {
        if (context instanceof InternalProcessorContext internalContext) {
            internalContext.setRecordContext(
                new ProcessorRecordContext(
                    timestamp,
                    0L,
                    context.taskId().partition(),
                    "dummy-topic",
                    new RecordHeaders()
                )
            );
        }
        
        Record<String, String> record = new Record<>(
            "heartbeat-" + timestamp,  // different key every second
            "heartbeat",
            timestamp
        );
        context.forward(InternalFixedKeyRecordFactory.create(record));
    }
}

I'm using processValues() instead of process() to avoid repartitioning, and sending heartbeat events with different keys every second to trigger session closures, but windows aren't closing consistently.

Version: org.apache.kafka:kafka-streams:3.7.0

Upvotes: 3

Views: 26

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62330

but windows aren't closing consistently.

Can you describe in more detail what you observe, and what you expect to get?

Btw: Given that you are using internals from Kafka Streams, there is no guarantee that this will actually work... You should strictly avoid using anything internal, as it might change at any point (syntactically, as well as semantically).

I believe that is also why you get a NPE to begin with -- using context.forward inside a Punctuator in a processValue is not officially supported: https://issues.apache.org/jira/browse/KAFKA-16585

Upvotes: 0

Related Questions