Shilpi
Shilpi

Reputation: 119

Getting Class Cast exception in Kafka Stream API

I am producing input data as json string .

For Topic - myinput

{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
     :"myClass","id":112553,"Scope":"198S"}

My class looking like this:

public class App {
    static public class CountryMessage {

        public String time;
        public String Scope;
        public String id;
        public String oclass;
        public String name; 
    }

    private static final String APP_ID = "countries-streaming-analysis-app";

    public static void main(String[] args) {
        System.out.println("Kafka Streams Demonstration");


        StreamsConfig config = new StreamsConfig(getProperties());
        final Serde < String > stringSerde = Serdes.String();
        final Serde < Long > longSerde = Serdes.Long();

        Map < String, Object > serdeProps = new HashMap < > ();
        final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageSerializer.configure(serdeProps, false);

        final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
        serdeProps.put("JsonPOJOClass", CountryMessage.class);
        countryMessageDeserializer.configure(serdeProps, false);
        final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer,countryMessageDeserializer);

        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "vanitopic");

        KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

        KTable<Windowed<String>, Long> aggregatedStream = countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

        System.out.println("Starting Kafka Streams Countries Example");
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
        kafkaStreams.start();
        System.out.println("Now started CountriesStreams Example");
    }

    private static Properties getProperties() {
        Properties settings = new Properties();

        settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.106.9.235:9092,10.106.9.235:9093,10.106.9.235:9094");
        settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.106.9.235:2181");
        //settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        //settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        return settings;
    }
}

I am getting bellow class cast exception:

Exception in thread "countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5a138ffaddb2-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=vanitopic, partition=0, offset=2036 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.cisco.streams.countries.App$CountryMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:88) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 16 more

I need help to understand how and where to apply custom Serdes those I have created

Upvotes: 2

Views: 10056

Answers (2)

mostafa cs
mostafa cs

Reputation: 738

Add serializers to groupByKey

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey(Grouped.with(Serdes.String(), new ObjectMapperSerde<>(CountryMessage.class)));

Upvotes: 5

Matthias J. Sax
Matthias J. Sax

Reputation: 62285

In your code,

KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();

the groupByKey() need to set both serializer as this will trigger a data repartitioning. Or you set default Serded to for String and CountryMessage types.

As mentioned in my comment, every operator that does not use default Serdes from StreamsConfig need to set the correct Serdes.

Thus, also the count() operation need to specify corresponding String and Long Serdes:

countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");

All operators that might need Serdes have appropriate overload. Just check out all overload of all operators you are using.

Check out the docs for more details: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html

Upvotes: 6

Related Questions