Dmitry Ivanov
Dmitry Ivanov

Reputation: 1

Data not sent to Kafka with Avro serialization

I'm trying to send data to Kafka with Avro, but the "messages" remain empty. I tried to add the schema to Kafka manually, but to no result. I don't see any errors in the logs of my application, in the schema-registry logs, or in Kafka's logs. Can you tell me what I am doing wrong?

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }    

case class User(name: String, age: Int)

val connectConfig: Map[String, Object] =
Map(
  "bootstrap.servers"   -> "localhost:9092",
  "client.id"           -> s"clientID${UUID.randomUUID().toString}",
  "key.serializer"      -> "org.apache.kafka.common.serialization.StringSerializer",
  "value.serializer"    -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
  "schema.registry.url" -> "localhost:8081"
)

implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)

def sendAvro(msg: DIL): Future[RecordMetadata] = {
    val newUser = User("Joe", 42)
    val schema  = AvroSchema[User]
    implicit val schemaFor = SchemaFor[User]
    val format = RecordFormat[User]

    val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
    producer.send(record)
}

Upvotes: 0

Views: 367

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191728

Kafka sends data in batches, and your single record is smaller than the default batch size.

Use .send(data).get() or producer.flush() to immediately wait on one record to be sent.

Otherwise, you should define a thread on scala.sys.ShutdownHookThread to flush the producer

Upvotes: 0

Related Questions