Reputation: 98
I'm trying to send Json data into kafka topics: I have this Scala code:
def sendMessage(sender_id: String, receiver_id: String, content: String) = {
val newMessage = new Producer(brokers = KAFKA_BROKER, key = "1", topic = "topic_2", message = {
sender_id + receiver_id + content
})
newMessage.sendMessages()
}
I want to send the sender_id, receiver_id & content as Json data to my topic but that doesn't seem to work. This is my producer code
class Producer(topic: String, key: String, brokers: String, message: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(): Unit = {
val record = new ProducerRecord[String, String](topic, key, message)
producer.send(record)
producer.close()
}
I want to save the content as Json to custom retreive it later from my consumer.
Upvotes: 0
Views: 1974
Reputation: 191681
This is not JSON, it is a code block; most importantly, JSON has key-value pairs.
message = {
sender_id + receiver_id + content
}
You would need to add an external library such as json4s
to create a JSON string and/or case-class object for serialization.
val json = ("sender_id" -> sender_id, ...)
producer.send(new ProducerRecord(topic, compact(render(json)))
Upvotes: 1