so-random-dude
so-random-dude

Reputation: 16465

Kafka streams - KSQL - Split messages and publish to another topic

Is there a way to split a message into multiple messages using KSQL and publish to a new topic. Just to be clear, I am not looking for a Java based listener and iterate/stream it to a new topic; instead, I am looking for a KSQL that does that for me.

For example:

Let's say, I need messages in invoice topic split into item_inventory_delta messages

invoice topic

key: saleschecknumber

message example:

{
    "total": 12.33,
    "salecounter": 1,
    "items": [
        {
            "itemId": 123,
            "quantity": 1
        },
        {
            "itemId": 345,
            "quantity": 5
        }
    ]
}

item_inventory_delta topic

key: saleschecknumber_itemID

message examples

1.

{
    "itemId": 123,
    "quantity": 1
}

2.

{
    "itemId": 345,
    "quantity": 5
}

Upvotes: 4

Views: 4254

Answers (3)

Brandon Mott
Brandon Mott

Reputation: 61

For a KStream application, you can use flatMap, which accepts a function that takes a record and returns an iterable of zero or more records, for example:

case class Record(total: Double, salecounter: Int, items: List[Item])
case class Item(itemId: Int, quantity: Int)

// Initialize the stream.
val inputStream: KStream[String, Record] = ??? 

// Split the message.
inputStream.flatMap { case (key, record) => 
  record.items.map(item => (key, item) )
}

Upvotes: 1

Nitin
Nitin

Reputation: 3832

There are many ways to handle for my understanding its more related to how we process incoming messages not to aggregate the message. Easy way to use Kafka Stream Processor API which allows you to customize processing logic.

Kafka Stream Processor API

The Processor API allows developers to define and connect custom processors and to interact with state stores. With the Processor API, you can define arbitrary stream processors that process one received record at a time, and connect these processors with their associated state stores to compose the processor topology that represents a customized processing logic

Note: You have not to define what will output value so I am just posting the key and value same but it's your choice you can define your output key and value

You can define Kafka Stream processor API as below

Topology builder = new Topology();
builder.addSource("Source", "invoice")
                .addProcessor("sourceProcessor", () -> new InvoiceProcessor(), "Source")
                .addSink("sinkDeltaInvoice", "item_inventory_delta", Serdes.String().serializer(), Serdes.String().serializer(),
                        "sourceProcessor")

Below is the custom Processor approach please note its just an approach, not a full implementation

class InvoiceProcessor implements Processor<String, String> {
        private Gson gson = new Gson();

        //constructor
        .......
        private ProcessorContext context;

        @Override
        public void init(ProcessorContext context) {
            this.context = context;

        }

        @Override
        public void close() {
            // Any code for clean up would go here. This processor instance will not be used
            // again after this call.
        }

        @Override
        public void process(String key, String value) {
            try {

                //Create custom inventory to map JSON object  
                //List[Item] items is member object of Inventory class
                Inventory inventory = gson.fromJson(key, Inventory.class);
                
                
                //itertae item of items List[Items]
                for(Item item: inventory.getItems()){
                context.forward(gson.toJson(item), gson.toJson(item), To.child("sinkDeltaInvoice"));
                
                }
                //
                
                
                }


        }

    }  

Upvotes: 1

Robin Moffatt
Robin Moffatt

Reputation: 32050

As of ksqlDB 0.6 you can now do this, thanks to the addition of the EXPLODE table function.

Given a topic invoice with JSON payload per your example, first inspect the topic using PRINT to dump its contents:

ksql> PRINT invoice FROM BEGINNING;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","total":12.33,"salecounter":1,"items":[{"itemId":123,"quantity":1},{"itemId":345,"quantity":5}]}

Then declare a schema on topic of the topic, which gives us a ksqlDB stream:

CREATE STREAM INVOICE (total DOUBLE, 
                       salecounter INT, 
                       items ARRAY<STRUCT<itemId INT, 
                                          quantity INT>>) 
                WITH (KAFKA_TOPIC='invoice', 
                      VALUE_FORMAT='JSON');

This simply "registers" the existing topic for use with ksqlDB. No new Kafka topics are written, until the next step.

Create a new Kafka topic, populated continually from the messages arriving in the source stream:

CREATE STREAM INVENTORY WITH (KAFKA_TOPIC='item_inventory_delta') AS 
  SELECT EXPLODE(ITEMS)->ITEMID AS ITEMID, 
         EXPLODE(ITEMS)->QUANTITY AS QUANTITY 
    FROM INVOICE;

New topic has been created:

ksql> SHOW TOPICS;

 Kafka Topic                     | Partitions | Partition Replicas
-------------------------------------------------------------------
 invoice                         | 1          | 1
 item_inventory_delta            | 1          | 1

Topic has delta messages as requested :)

ksql> PRINT item_inventory_delta;
Format:JSON
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":123,"QUANTITY":1}
{"ROWTIME":1575366231505,"ROWKEY":"null","ITEMID":345,"QUANTITY":5}

Upvotes: 9

Related Questions