Reputation: 11
I am trying a very simple Ktable to Ktable join in Scala. Both topics have single partition but still I don't see anything coming to the 'output-topic'
object SimpleMerge extends App {
val properties = KafkaProperties.get()
val streamBuilder = new StreamsBuilder()
val objectMetadataTable: KTable[Long, String] = streamBuilder.table("metadata-topic")
val objectClassificationTable: KTable[Long, String] = streamBuilder.table("classification-topic")
val objectClassificationWithMetadata: KTable[Long, String] = objectMetadataTable
.join(objectClassificationTable, (metadata: String, classification: String) => metadata + classification)
objectClassificationWithMetadata.toStream().to("output-topic")
val kafkaStreams = new KafkaStreams(streamBuilder.build(), properties)
kafkaStreams.cleanUp()
kafkaStreams.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
kafkaStreams.close()
}))
}
I am sure this is very basic but I cannot find any clue. Note: I can see both input topics are producing results with correct same keys so the question of null key is already ruled out.
Thanks
Exception
02:26:57.936 [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - stream-thread [MetadataClassificationMerger_1-757746d5-32e5-412b-934d-d29c1fc8c7a9-StreamThread-1] task [0_0] Failed to flush state store metadata-topic-STATE-STORE-0000000000:
org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: [B, and value: org.apache.kafka.streams.kstream.internals.Change.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:122) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:333) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:276) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:177) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:582) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:536) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:524) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:226) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:529) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:959) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:813) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-2.4.0.jar:?]
Caused by: java.lang.ClassCastException: [B cannot be cast to java.lang.String
at com.pan.pcs.dlp.kafkamerger.MergeObjectMetadataClassification$$anon$1.apply(MergeObjectMetadataClassification.scala:19) ~[classes/:?]
at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:110) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.kstream.internals.KTableKTableInnerJoin$KTableKTableJoinProcessor.process(KTableKTableInnerJoin.java:67) ~[kafka-streams-2.4.0.jar:?]
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-2.4.0.jar:?]
... 24 more
Upvotes: 1
Views: 568
Reputation: 62350
By default ByteArraySerdes
are used and the error message indicates that your program does still use those instead if LongSerde
for keys and StringSerde
for values.
If you use the official Scala API and load the corresponding explicits, the correct Serdes should be used automatically. Otherwise, you would either need to set different default serdes via StreamsConfig
of overwrite them on a per-operator basis by passing in a Consumed.with(...)
into the table(...)
operators.
Check out the docs for details: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#kstreams-dsl-for-scala
Upvotes: 0