Ahmed A
Ahmed A

Reputation: 3662

Using KeyBy vs reinterpretAsKeyedStream() when reading from Kafka

I have a simple Flink stream processing application (Flink version 1.13). The Flink app reads from Kakfa, does stateful processing of the record, then writes the result back to Kafka. After reading from Kafka topic, I choose to use reinterpretAsKeyedStream() and not keyBy() to avoid a shuffle, since the records are already partitioned in Kakfa. The key used to partition in Kakfa is a String field of the record (using the default kafka partitioner). The Kafka topic has 24 partitions.

The mapping class is defined as follows. It keeps track of the state of the record.

public class EnvelopeMapper extends
        KeyedProcessFunction<String, Envelope, Envelope> {
   ...
}

The processing of the record is as follows:

        DataStream<Envelope> messageStream =
                env.addSource(kafkaSource)

        DataStreamUtils.reinterpretAsKeyedStream(messageStream, Envelope::getId)
                .process(new EnvelopeMapper(parameters))
                .addSink(kafkaSink);

With parallelism of 1, the code runs fine. With parallelism greater than 1 (e.g. 4), I am running into the follow error:

2022-06-12 21:06:30,720 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Custom Source -> Map -> Flat Map -> KeyedProcess -> Map -> Sink: Unnamed (4/4) (7ca12ec043a45e1436f45d4b20976bd7) switched from RUNNING to FAILED on 100.101.231.222:44685-bd10d5 @ 100.101.231.222 (dataPort=37839).
java.lang.IllegalArgumentException: KeyGroupRange{startKeyGroup=96, endKeyGroup=127} does not contain key group 85

Based on the stack trace, it seems the exception happens when EnvelopeMapper class validates the record is sent to the right replica of the mapper object.

When reinterpretAsKeyedStream() is used, how are the records distributed among the different replicas of the EventMapper?

Thank you in advance, Ahmed.

Update

After feedback from @David Anderson, replaced reinterpretAsKeyedStream() with keyBy(). The processing of the record is now as follows:

        DataStream<Envelope> messageStream =
                env.addSource(kafkaSource)      // Line x
                   .map(statelessMapper1)
                   .flatMap(statelessMapper2);

        messageStream.keyBy(Envelope::getId)
                     .process(new EnvelopeMapper(parameters))
                     .addSink(kafkaSink);

Is there any difference in performance if keyBy() is done right after reading from Kakfa (marked with "Line x") vs right before the stateful Mapper (EnvelopeMapper).

Upvotes: 2

Views: 1297

Answers (1)

David Anderson
David Anderson

Reputation: 43697

With

reinterpretAsKeyedStream(
    DataStream<T> stream,
    KeySelector<T, K> keySelector,
    TypeInformation<K> typeInfo)

you are asserting that the records are already distributed exactly as they would be if you had instead used keyBy(keySelector). This will not normally be the case with records coming straight out of Kafka. Even if they are partitioned by key in Kafka, the Kafka partitions won't be correctly associated with Flink's key groups.

reinterpretAsKeyedStream is only straightforwardly useful in cases such as handling the output of a window or process function where you know that the output records are key partitioned in a particular way. To use it successfully with Kafka is can be very difficult: you must either be very careful in how the data is written to Kafka in the first place, or do something tricky with the keySelector so that the keyGroups it computes line up with how the keys are mapped to Kafka partitions.

One case where this isn't difficult is if the data is written to Kafka by a Flink job running with the same configuration as the downstream job that is reading the data and using reinterpretAsKeyedStream.

Upvotes: 3

Related Questions