Reputation: 1
I'm trying to send data to Kafka with Avro, but the "messages" remain empty. I tried to add the schema to Kafka manually, but to no result. I don't see any errors in the logs of my application, in the schema-registry logs, or in Kafka's logs. Can you tell me what I am doing wrong?
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }
case class User(name: String, age: Int)
val connectConfig: Map[String, Object] =
Map(
"bootstrap.servers" -> "localhost:9092",
"client.id" -> s"clientID${UUID.randomUUID().toString}",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "localhost:8081"
)
implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)
def sendAvro(msg: DIL): Future[RecordMetadata] = {
val newUser = User("Joe", 42)
val schema = AvroSchema[User]
implicit val schemaFor = SchemaFor[User]
val format = RecordFormat[User]
val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
producer.send(record)
}
Upvotes: 0
Views: 367
Reputation: 191728
Kafka sends data in batches, and your single record is smaller than the default batch size.
Use .send(data).get()
or producer.flush()
to immediately wait on one record to be sent.
Otherwise, you should define a thread on scala.sys.ShutdownHookThread
to flush the producer
Upvotes: 0