Reputation: 573
i have a simple case class
case class KafkaContainer(key: String, payload: AnyRef)
then i want to send this to kafka topic via producer i do this
val byteArrayStream = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
output.write(msg)
output.close()
val bytes = byteArrayStream.toByteArray
producer.send(new ProducerRecord("my_topic", msg.key, bytes))
and this is working well
then i try to consume this
Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
.map { msg =>
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
input.close()
...
}.runWith(Sink.ignore)
and this is working well with any class in payload.
But! If it AnyRef. Consumer code fails with
Error:(38, 96) could not find implicit value for evidence parameter of type com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
Error:(38, 96) not enough arguments for method binary: (implicit evidence$21: com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer], implicit evidence$22: com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]. Unspecified value parameter evidence$22. val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
if i declare implicits with
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
it fail to compile with
Error:(58, 71) could not find Lazy implicit value of type com.sksamuel.avro4s.FromValue[Object] implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
Error:(58, 71) not enough arguments for method lazyConverter: (implicit fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]. Unspecified value parameter fromValue. implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
if a add every implicit that complier is require
lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
compile fails with error
Error:(58, 69) exception during macro expansion: java.lang.IllegalArgumentException: requirement failed: Require a case class but Object is not at scala.Predef$.require(Predef.scala:277) at com.sksamuel.avro4s.FromRecord$.applyImpl(FromRecord.scala:283) implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
but if i replace AnyRef for some class - no implicit required, everything works fine again
Upvotes: 0
Views: 547
Reputation: 147
I have a similar problem using the Any data type. You have to specify what types for this member variable are valid as Any or AnyRef could be anything. Then use Either or shapeless (also see the Github Documentation). For my case it can be String, Long, Double or null, so using shapeless you can do:
case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])
This converts into a union type in AVRO:
{
"name" : "value",
"type" : [ "null", "string", "long", "double" ]
}
Upvotes: 1