Reputation: 1640
I am using Kafka's Streams API with topology builder. I would like to know how I can do to have a processor that can convert one data type to another, so the next processor in the pipeline can use it.
As a simple use case :
[topic]--(string)-->[processor: parse json]--(object)-->[processor 2]--(object)-->[sink]
Any idea ?
Upvotes: 2
Views: 2702
Reputation: 15057
I assume you want to convert the message values in a Kafka topic from String to JSON.
You only need two parts:
String
for reading from your source topic, and (2) define a corresponding value serde for writing the JSON data (or Pojo) to the destination topic. Serdes are required to materialize your data when/where needed (e.g., writing your Pojos to Kafka requires materialization).See the example code under https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview for how to e.g. use JSON with Apache Kafka's Streams API.
Upvotes: 2