Reputation: 12417
I have a small app to count the number of colors using Apache Kafka -
public class FavouriteColor {
private static final String INPUT_TOPIC_NAME = "favourite-colour-input";
private static final String OUTPUT_TOPIC_NAME = "favourite-colour-output";
private static final String INTERMEDIATE_TOPIC_NAME = "favourite-colour-output";
private static final String APPLICATION_ID = "favourite-colour-java";
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream(INPUT_TOPIC_NAME);
KStream<String, String> usersAndColours = textLines
.filter((key, value) -> value.contains(","))
.selectKey((key, value) -> value.split(",")[0].toLowerCase())
.mapValues(value -> value.split(",")[1].toLowerCase())
.filter((user, colour) -> Arrays.asList("green", "blue", "red").contains(colour));
usersAndColours.to(INTERMEDIATE_TOPIC_NAME);
KTable<String, String> usersAndColoursTable = builder.table(INTERMEDIATE_TOPIC_NAME);
KTable<String, Long> favouriteColours = usersAndColoursTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count(Named.as("CountsByColours"));
favouriteColours.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
System.out.println(streams);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
The topics are created and producers/ consumers are started using the terminal:
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-input
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic user-keys-and-colours --config cleanup.policy=compact
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic favourite-colour-output --config cleanup.policy=compact
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic favourite-colour-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka-console-producer --bootstrap-server localhost:9092 --topic favourite-colour-input
I provided the following inputs into the terminal:
stephane,blue
john,green
stephane,red
alice,red
I received the error in the consumer terminal:
stephane Processed a total of 1 messages
[2021-11-27 21:31:58,155] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:26)
at org.apache.kafka.common.serialization.LongDeserializer.deserialize(LongDeserializer.java:21)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:519)
at scala.Option.map(Option.scala:242)
at kafka.tools.DefaultMessageFormatter.deserialize$1(ConsoleConsumer.scala:519)
at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:568)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:115)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
What's the issue here? I did brief research and find similar questions asked by other people, but, the solutions seem not to work for me.
Upvotes: 1
Views: 1360
Reputation: 16392
You defined the value deserializer to be that for Long, but it looks like your data is a String instead.
Upvotes: 4