Iheb Mar
Iheb Mar

Reputation: 98

Send Json to kafka Topic

I'm trying to send Json data into kafka topics: I have this Scala code:

  def sendMessage(sender_id: String, receiver_id: String, content: String) = {
    val newMessage = new Producer(brokers = KAFKA_BROKER, key = "1", topic = "topic_2", message = {
      sender_id + receiver_id + content
    })
    newMessage.sendMessages()

  }

I want to send the sender_id, receiver_id & content as Json data to my topic but that doesn't seem to work. This is my producer code

class Producer(topic: String, key: String, brokers: String, message: String) {

  val producer = new KafkaProducer[String, String](configuration)

  private def configuration: Properties = {
    val props = new Properties()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.ACKS_CONFIG, "all")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
    props
  }

  def sendMessages(): Unit = {
    val record = new ProducerRecord[String, String](topic, key, message)
    producer.send(record)
    producer.close()
  }

I want to save the content as Json to custom retreive it later from my consumer.

Upvotes: 0

Views: 1974

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191681

This is not JSON, it is a code block; most importantly, JSON has key-value pairs.

   message = {
      sender_id + receiver_id + content
    }

You would need to add an external library such as json4s to create a JSON string and/or case-class object for serialization.

val json = ("sender_id" -> sender_id, ...)
producer.send(new ProducerRecord(topic, compact(render(json)))

Upvotes: 1

Related Questions