Reputation: 119
I am producing input data as json string .
For Topic - myinput
{"time":"2017-11-28T09:42:26.776Z","name":"Lane1","oclass"
:"myClass","id":112553,"Scope":"198S"}
My class looking like this:
public class App {
static public class CountryMessage {
public String time;
public String Scope;
public String id;
public String oclass;
public String name;
}
private static final String APP_ID = "countries-streaming-analysis-app";
public static void main(String[] args) {
System.out.println("Kafka Streams Demonstration");
StreamsConfig config = new StreamsConfig(getProperties());
final Serde < String > stringSerde = Serdes.String();
final Serde < Long > longSerde = Serdes.Long();
Map < String, Object > serdeProps = new HashMap < > ();
final Serializer < CountryMessage > countryMessageSerializer = new JsonPOJOSerializer < > ();
serdeProps.put("JsonPOJOClass", CountryMessage.class);
countryMessageSerializer.configure(serdeProps, false);
final Deserializer < CountryMessage > countryMessageDeserializer = new JsonPOJODeserializer < > ();
serdeProps.put("JsonPOJOClass", CountryMessage.class);
countryMessageDeserializer.configure(serdeProps, false);
final Serde < CountryMessage > countryMessageSerde = Serdes.serdeFrom(countryMessageSerializer,countryMessageDeserializer);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, CountryMessage> countriesStream = kStreamBuilder.stream(stringSerde, countryMessageSerde, "vanitopic");
KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();
KTable<Windowed<String>, Long> aggregatedStream = countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");
System.out.println("Starting Kafka Streams Countries Example");
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, config);
kafkaStreams.start();
System.out.println("Now started CountriesStreams Example");
}
private static Properties getProperties() {
Properties settings = new Properties();
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.106.9.235:9092,10.106.9.235:9093,10.106.9.235:9094");
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "10.106.9.235:2181");
//settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
return settings;
}
}
I am getting bellow class cast exception:
Exception in thread "countries-streaming-analysis-app-f7f95119-4401-4a6e-8060-5a138ffaddb2-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=vanitopic, partition=0, offset=2036 at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203) at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.cisco.streams.countries.App$CountryMessage). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189) ... 3 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:88) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 16 more
I need help to understand how and where to apply custom Serdes those I have created
Upvotes: 2
Views: 10056
Reputation: 738
Add serializers to groupByKey
KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey(Grouped.with(Serdes.String(), new ObjectMapperSerde<>(CountryMessage.class)));
Upvotes: 5
Reputation: 62285
In your code,
KGroupedStream<String, CountryMessage> countries = countriesStream.selectKey((k, traffic) -> traffic.Scope).groupByKey();
the groupByKey()
need to set both serializer as this will trigger a data repartitioning. Or you set default Serded to for String
and CountryMessage
types.
As mentioned in my comment, every operator that does not use default Serdes from StreamsConfig
need to set the correct Serdes.
Thus, also the count()
operation need to specify corresponding String
and Long
Serdes:
countries.count(TimeWindows.of(60 * 1000L), "UserCountStore");
All operators that might need Serdes
have appropriate overload. Just check out all overload of all operators you are using.
Check out the docs for more details: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html
Upvotes: 6