Reputation: 16465
Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.
For example:
Let's say, I need messages in invoice
topic split into item_inventory_delta
messages
key: saleschecknumber
message example:
{
"total": 12.33,
"salecounter": 1,
"items": [
{
"itemId": 123,
"quantity": 1
},
{
"itemId": 345,
"quantity": 5
}
]
}
key: saleschecknumber_itemID
message examples
{
"itemId": 123,
"quantity": 1
}
2.
{
"itemId": 345,
"quantity": 5
}
Upvotes: 4
Views: 4254
Reputation: 61
For a KStream
application, you can use flatMap
, which accepts a function that takes a record and returns an iterable of zero or more records, for example:
case class Record(total: Double, salecounter: Int, items: List[Item])
case class Item(itemId: Int, quantity: Int)
// Initialize the stream.
val inputStream: KStream[String, Record] = ???
// Split the message.
inputStream.flatMap { case (key, record) =>
record.items.map(item => (key, item) )
}
Upvotes: 1
Reputation: 3832
There are many ways to handle for my understanding its more related to how we process incoming messages not to aggregate the message. Easy way to use Kafka Stream Processor API which allows you to customize processing logic.
The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic
Note: You have not to define what will output value so I am just posting the key and value same but it's your choice you can define your output key and value
You can define Kafka Stream processor API as below
Topology builder = new Topology();
builder.addSource("Source", "invoice")
.addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
.addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
"sourceProcessor")
Below is the custom Processor approach please note its just an approach, not a full implementation
class InvoiceProcessor implements Processor<String, String> {
private Gson gson = new Gson();
//constructor
.......
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {
// Any code for clean up would go here. This processor instance will not be used
// again after this call.
}
@Override
public void process(String key, String value) {
try {
//Create custom inventory to map JSON object
//List[Item] items is member object of Inventory class
Inventory inventory = gson.fromJson(key, Inventory.class);
//itertae item of items List[Items]
for(Item item: inventory.getItems()){
context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
}
//
}
}
}
Upvotes: 1
Reputation: 32050
As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE
table function.
Given a topic invoice
with JSON payload per your example, first inspect the topic using PRINT
to dump its contents:
ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}
Then declare a schema on topic of the topic, which gives us a ksqlDB stream:
CREATE STREAM INVOICE (total DOUBLE,
salecounter INT,
items ARRAY<STRUCT<itemId INT,
quantity INT>>)
WITH (KAFKA_TOPIC='invoice',
VALUE_FORMAT='JSON');
This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.
Create a new Kafka topic, populated continually from the messages arriving in the source stream:
CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS
SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID,
EXPLODE(ITEMS)->QUANTITY AS QUANTITY
FROM INVOICE;
New topic has been created:
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
-------------------------------------------------------------------
invoice | 1 | 1
item_inventory_delta | 1 | 1
Topic has delta messages as requested :)
ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}
Upvotes: 9