Stella
Stella

Reputation: 1866

Exception when processing data during Kafka stream process

I am working on Kafka streams using the below code. I check a filter condition from a JSON obj for the condition if "UserID":"1". Please refer the code below

builder.<String,String>stream(Serdes.String(), Serdes.String(), topic)
                   .filter(new Predicate <String, String>() {

               String userIDCheck = null;

               @Override
            public boolean test(String key, String value) {

                   try {
                       JSONObject jsonObj = new JSONObject(value);

                       userIDCheck = jsonObj.get("UserID").toString();
                       System.out.println("userIDCheck: " + userIDCheck);                          
                   } catch (JSONException e) {
                       // TODO Auto-generated catch block
                       e.printStackTrace();
                   }

                   return userIDCheck.equals("1");
               }
            })
           .to(streamouttopic);

value : {"UserID":"1","Address":"XXX","AccountNo":"989","UserName":"Stella","AccountType":"YYY"}

I get the below error:

    Exception in thread "SampleStreamProducer-4eecc3ab-858c-44a4-9b8c-5ece2b4ab21a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=testtopic1, partition=0, offset=270
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:89)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)

Value and condition are fine from above stream code, i couldn't get why it is giving this exception during when executing steam code.

Upvotes: 4

Views: 7123

Answers (1)

Matthias J. Sax
Matthias J. Sax

Reputation: 62350

The reported issue should only applies to Kafka 2.0 and older. Since 2.1.0 release, Kafka Streams supports "serde push down" and the to() operator should inherit the correct serdes from upstream (cf https://issues.apache.org/jira/browse/KAFKA-7456).

For Kafka 2.0 and older, you have to specify the correct Serdes for the to() operation, explicitly. Otherwise, it uses the default Serdes from the StreamsConfig that is ByteArraySerde (because the semantics or serde overwrite is a per operator "drop-in overwrite") -- and String cannot be cast to byte[].

You need to do:

.to(streamoutputtopic, Produced.with(Serdes.String(), Serdes.String()));

For even older versions (pre 1.0) that don't use Produced parameter yet, the code would be:

.to(Serdes.String(), Serdes.String(), streamoutputtopic);

Upvotes: 10

Related Questions