Shivansh
Shivansh

Reputation: 3544

How to save a Akka Stream of Case class to Kafka directly?

I am able to save my data to Kafka in form of String like this :

 val producerSettings = ProducerSettings(actorSystem, new StringSerializer, new StringSerializer)
    .withBootstrapServers("localhost:9092")


def kafkaSink(source: Source[ModbusMessage, NotUsed]) = source.map(a => s"Id:${a.sensorId}:Values:${a.values.mkString(",")}").map { m =>
    new ProducerRecord[String, String]("sampleData", m)
  }.
    runWith(Producer.plainSink(producerSettings))

, but is there a way to save my case class directly into Kafka. Like if I want to save my data in form of My case class ModbusMessage.

If someone can provide me a quick example that would be great !

Thanks. Help is appreciated !

Upvotes: 1

Views: 606

Answers (1)

miguno
miguno

Reputation: 15077

Kafka's model of a data message, which consists of a message key and a message value, is based on raw bytes (byte[]) for keys and values. So you need to provide a serializer that converts your case class into byte[]. In your example above, you have configured the use of StringSerializer for both keys and values, and the serializer converts String -> byte[].

but is there a way to save my case class directly into Kafka. Like if I want to save my data in form of My case class ModbusMessage.

If you have a case class ModbusMessage, then you need to implement + configure a ModbusMessage -> byte[] serializer. You can implement such a serializer yourself, but as others have commented on your question you can also opt for serialization frameworks such as Avro or Protobuf. You may also want to look at e.g. https://github.com/scala/pickling.

Do I need to convert it into Json using library like liftJson and then save it as a String inside Kafka ?

No, you don't need to. You can -- and most probably also should -- convert directly from your case class ModbusMessage to byte[] (and, for deserialization, in the opposite direction). There's no point in converting your case class first to JSON, because the JSON representation, too, must be serialized to byte[] so that it can be sent to Kafka (so you'd incur twice the conversion cost here).

Upvotes: 2

Related Questions