I need to get data from Kafka topic as a Zio Stream, data there is in the google protobuf format, also i need to check schema
I use the following sample protobuf file which generates proto.Data Java class for me:
syntax = "proto3";
package proto;
import "google/protobuf/timestamp.proto";
option java_multiple_files = true;
option java_outer_classname = "Protos";
message Data {
string id = 1;
google.protobuf.Timestamp receiveTimestamp = 2;
If i use the following properties i am able to get data as KStream[proto.Data] (so using kafka api) for the proto.Data proto Message class
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, s"kstream-application-${java.util.UUID.randomUUID().toString}")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p.put("security.protocol", "SSL")
p.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, "io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde")
p.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081")
p.put("", "false")
p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
p.put("specific.protobuf.value.type", classOf[proto.Data])
And here is the example of code using the KStream (I am able to print record with exact Id equals 1 only):
val builder: StreamsBuilder = new StreamsBuilder
val risks: KStream[String, proto.Data] =
.stream[String, proto.Data](topic)
.filter((_, value) => value.getId=="1")
val sysout = Printed
.toSysOut[String, proto.Data]
val streams: KafkaStreams = new KafkaStreams(, props)
sys.ShutdownHookThread {
Now if i use zio kafka and same properties somehow i am able to print out the whole stream:
val props: Map[String, AnyRef] = Map(
StreamsConfig.APPLICATION_ID_CONFIG -> s"kstream-application-${java.util.UUID.randomUUID().toString}",
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
"security.protocol" -> "SSL",
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG -> Serdes.String.getClass.getName,
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG -> "io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde",
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081",
"" -> "false",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
"specific.protobuf.value.type" -> classOf[proto.Data]
val myStream = for {
serdeProto <- Serde.fromKafkaSerde(new KafkaProtobufSerde[proto.Data](), props, true)
_ <- stream
.plainStream(Serde.string, serdeProto)
.provideSomeLayer(consumer ++
.tap(r => console.putStrLn(s"stream: $r"))
} yield ()
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
But if i try to filter only record with Id equals 1
val myStream = for {
serdeProto <- Serde.fromKafkaSerde(new KafkaProtobufSerde[proto.Data](), props, true)
_ <- stream
.plainStream(Serde.string, serdeProto)
.provideSomeLayer(consumer ++
.tap(r => console.putStrLn(s"stream: $r"))
} yield ()
I get error like
Fiber failed.
An unchecked error was produced.
java.lang.ClassCastException: cannot be cast to proto.Data
I was wondering if anybody used zio kafka together with google protobuf and deserialization to the Java proto class was successful when you read data from the topic?
