Reputation: 806
We are having these dependencies:
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
We use a code generator to generate Scala case classes out of AVRO schema files. One such generated case class has, as one of its fields, an Either value. In AVRO schema this is expressed with type=[t1,t2] so the generation seems to be decent, that is a sum type: can be type t1 or type t2.
The question becomes what is missing on the deserialization path from topic to case class (binary -> Avro Map -> case class).
Basically I am getting this error currently:
could not find implicit value for parameter consumed: org.apache.kafka.streams.scala.kstream.Consumed[String, custom.UserEvent]
[error] .stream[String, UserEvent]("schma.avsc")
The first thought was kafka-streams-avro-serde, but it may be that this library only ensure the Serde[GenericRecord] for AVRO Map, not for case classes. So one of the other dependencies is helping with the AVRO GenericRecord to case classes mapping and back. We also have some hand written code that generates case classes out of schemas, that seems to work directly with spray json.
I'm thinking that in the (binary <-> Avro GenericRecord <-> case class instance) transformations, there is a gap, and it could be the fact that in the case class there is an Either field?
I'm taking a path now to try to create a Serde[UserEvent] instance. So that in my understanding would involve converting between UserEvent and AVRO GenericRecord, similar to Map, and then between AVRO Record and binary - which is likely covered by the kafka-streams-avro-serde dependency, like there should be a Serde[GenericRecord] or similar.
Imports wise, we have this to import implicits:
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes
import org.apache.kafka.streams.scala.Serdes._
import org.apache.kafka.streams.scala.kstream.Consumed
Upvotes: 2
Views: 1851
Reputation: 725
For me I had to follow the directions better, and add an implicit serde implementation. Their example in the link looks like this:
// An implicit Serde implementation for the values we want to
// serialize as avro
implicit val userClicksSerde: Serde[UserClicks] = new AvroSerde
For a fuller example, see the scala tests for their avro lib:
// Make an implicit serde available for GenericRecord, which is required for operations such as `to()` below.
implicit val genericAvroSerde: Serde[GenericRecord] = {
val gas = new GenericAvroSerde
val isKeySerde: Boolean = false
gas.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, cluster.schemaRegistryUrl), isKeySerde)
gas
}
Upvotes: 0
Reputation: 318
In fact an import was missing. Now it works to compile. Here are the imports:
import org.apache.kafka.streams.Topology
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.Serdes._
Upvotes: 6
Reputation: 62330
Did you import the corresponding package?
import org.apache.kafka.streams.scala.ImplicitConversions._
Cf. https://kafka.apache.org/24/documentation/streams/developer-guide/dsl-api.html#scala-dsl
Upvotes: 0