Reputation: 1082
I am trying to implement a simple counter with kafka streams. It takes a key value and adds new values if the same key comes.
This is the code I wrote so far
package exercises;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
public class LiveCounter {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "live-counter-2");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
properties.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.LongSerde.class.getName());
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLine = builder.stream("counter-in");
KStream<String, Long> words = textLine
.filter((k, v) -> {
try {
String[] arr = v.split(" ");
Long.parseLong(arr[1]);
return true;
} catch (NumberFormatException e) {
return false;
}
})
.selectKey((k, v) -> v.split(" ")[0])
.mapValues((k, v) -> Long.parseLong(v.split(" ")[1]))
.groupByKey()
.reduce(Long::sum)
.toStream();
words.to("counter-out", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
System.out.println(streams.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
As we can see the topology is simple. It takes a textline and changes the key and then finally adds all the values for key groups with the reduce function.
I tried streaming until mapValues and it was working. But this code block is causing the problem
.groupByKey()
.reduce(Long::sum)
.toStream();
This is the stacktrace
live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.errors.LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: counter-in, partition: 0, offset: 1
org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Shutting down
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from RUNNING to ERROR
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] All stream threads have died. The instance will be in error state and should be closed.
[live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Shutdown complete
Exception in thread "live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:175)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:162)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:765)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:764)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
[Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from ERROR to PENDING_SHUTDOWN
[kafka-streams-close-thread] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67-StreamThread-1] Informed to shut down
[kafka-streams-close-thread] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] State transition from PENDING_SHUTDOWN to NOT_RUNNING
[Thread-1] INFO org.apache.kafka.streams.KafkaStreams - stream-client [live-counter-2-9a694aa5-589d-4d2f-8e1c-ff64b6e05b67] Streams client stopped completely
Any help would be appreciated. Thanks in advance
Upvotes: 2
Views: 10812
Reputation: 5443
Since your input topic, counter-in
, consists of messages that are String
s, and your default value serializer is LongSerde
, you need to tell Kafka Streams to deserialize the value as a string as follows:
KStream<String, String> textLine = builder.stream("counter-in", Consumed.with(Serdes.String(), Serdes.String()));
Upvotes: 3