Reputation: 113
I was trying to join stream with GlobaKTable, getting the serialization issue.
Joining productStream of KStream<String,String>
with productTable is GlobalKTable<String,Product>
(here Product is a AVRO message)
Below piece of code is doing the join,
KStream<String, Product> detailedProductStream = productStream.join(
productTable,
(productStreamKey, productStreamValue) -> productStreamValue,
(productStreamValue, productTableValue) -> productTableValue
);
Defined SpecificAvroSerde for Avro message and Serdes.String() for String type message. I am getting below exception. Any clue? Thanks
Exception in thread "web-click-charts-a086d30c-5f8d-4e1f-8e3e-ab4b2ac8f03a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000006, topic=PRODUCT-CURRENT, partition=0, offset=2, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:175)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.keyBytes(MeteredWindowStore.java:222)
at org.apache.kafka.streams.state.internals.MeteredWindowStore.fetch(MeteredWindowStore.java:167)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$WindowStoreReadWriteDecorator.fetch(ProcessorContextImpl.java:547)
at org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin$KStreamKStreamJoinProcessor.process(KStreamKStreamJoin.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
Serdes defined as below
Map<String, String> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
SpecificAvroSerde<Product> productSerde = new SpecificAvroSerde<>();
productSerde.configure(props, false);
Serde stringSerde = Serdes.String();
GlobalKTable<String, Product> productTable = builder.globalTable("TOPIC-1",
Consumed.with(stringSerde, productSerde));
KStream<String, String> productStream = builder.stream("TOPIC-2",
Consumed.with(stringSerde, stringSerde));
Upvotes: 1
Views: 2240
Reputation: 62330
How/where did you specify the serdes? Your code snippet does not show this part.
However, the stack trace indicate that the issue is not with a stream-globalTable join, but with a windowed stream-stream join (the stacktrace shows KStreamKStreamJoin
) and the error message shows that ByteArraySerializer
is used for a String key.
Upvotes: 2