Yanick Salzmann
Yanick Salzmann

Reputation: 1498

Kafka streams not using serde after repartitioning

My Kafka Streams application is consuming from a kafka topic that is using the following key-value layout: String.class -> HistoryEvent.class

When printing my current topic this can be confirmed:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flow-event-stream-file-service-test-instance --property print.key=true --property key.separator=" -- " --from-beginning
flow1 --  SUCCESS     #C:\Daten\file-service\in\crypto.p12

"flow1" is the String key and the part after -- is the serialized value.

My flow is set up like this:

    KStream<String, HistoryEvent> eventStream = builder.stream(applicationTopicName, Consumed.with(Serdes.String(),
            historyEventSerde));


    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey()
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

So as far as I know I am telling it to consume the topic using String and HistoryEvent serde as this is what is in the topic. I then 'rekey' it to use a combined key which should be stored locally using the provided serde for HistoryEventKey.class. As far as I understand this will cause an additional topic to be created (can be seen with topic list in the kafka container) with the new key. This is fine.

Now the problem is the application is unable to start up even from a clean environment with just that one document in the topic:

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=flow-event-stream-file-service-test-instance, partition=0, offset=0
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: HistoryEventSerializer) is not compatible to the actual key or value type (key type: HistoryEventKey / value type: HistoryEvent). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.

It is kinda hard to tell from the message where exactly the issue is. It says in my base topic but that is not possible as the key there is not of type HistoryEventKey. Since I have provided a serde for HistoryEventKey in the reduce it also cannot be with the local store.

The only thing that makes sense to me is that it is related to the selectKey operation that causes a rearranging and a new topic. However I am not able to figure out how I can provide the serde to that operation. I do not want to set it as a default, because it is not the default key serde.

Upvotes: 2

Views: 1993

Answers (2)

deddu
deddu

Reputation: 895

I've encountered a very similar error message, yet I had no groupbys, but joins instead. I'm posting here for the next person that googles around.

org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic my-processor-KSTREAM-MAP-0000000023-repartition. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.mycorp.mySession). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).

Clearly, same as in the original question, I did not want to change the default serdes.

So in my case the solution was to pass a Joined instance in the join, which will allow to pass in the serdes. Note that the error message points to a repartition-MAP-... which is a bit of a red herring, because the fix goes somewhere else.

how I fixed it (a joined example)

//...omitted ...

    KStream<String,MySession> mySessions = myStream
    .map((k,v) ->{
      MySession s = new MySession(v);
      k = s.makeKey();
      return new KeyValue<>(k, s);
    });
// ^ the mapping causes the repartition, you can not however specify a serde in there.


// but in the join right below, we can pass a JOINED instance and fix it.
    return enrichedSessions
      .leftJoin(
        myTable,
        (session, info) -> {
          session.infos = info;
          return session; },
        Joined.as("my_enriched_session")
              .keySerde(Serdes.String())
              .valueSerde(MySessionSerde())
      );

Upvotes: 0

Yanick Salzmann
Yanick Salzmann

Reputation: 1498

After doing some more debugging of the execution I was able to figure out that the new topic is created in the groupByKey step. You can provide a Grouped instance that offers the possibility to specify the Serde used for key and value:

    eventStream.selectKey((key, value) -> new HistoryEventKey(key, value.getIdentifier()))
            .groupByKey(Grouped.<HistoryEventKey, HistoryEvent>as(null)
                    .withKeySerde(new HistoryEventKeySerde())
                    .withValueSerde(new HistoryEventSerde())
            )
            .reduce((e1, e2) -> e2,
                    Materialized.<HistoryEventKey, HistoryEvent, KeyValueStore<Bytes, byte[]>>as(streamByKeyStoreName)
                            .withKeySerde(new HistoryEventKeySerde()));

Upvotes: 4

Related Questions