Vinit Pillai
Vinit Pillai

Reputation: 516

How to process dynamically in kafka streams and send to different topic

I am creating a stream processing app. It should create a kafka stream connection. When a message arrives following are the required things I want to do:

Requirement:

  1. Each msg should be processed and sent before processing the next msg with the help of stream function and not using another kafka producer
  2. If the requirement one is achieved then I should be able to send msg to topics that will be decided dynamically according to type.

Upvotes: 2

Views: 5839

Answers (2)

OneCricketeer
OneCricketeer

Reputation: 191681

If you want to check types, you're essentially filtering those events matching those types.

Therefore, you don't need map or foreach, you'd have better luck with filter(...).to(topic}

    final ObjectMapper mapper = Util.getObjectMapper();
    KStream notTestEvents = input.filter((key, value) -> {
        //check type and ask object from factory
        try {
            JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
            String type = msg.get("type").toString();
            System.out.println("OUT");
            return !type.equalsIgnoreCase("test");     
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    );
    notTestEvents.to("notStream");

The other option is branching

KStream<String, String>[] branches = events.branch(
    (k, v) -> { 
       return !mapper
          .readValue(value, JSONObject.class)
          .get("type").toString();
          .equalsIgnoreCase("test")
    },
    (k, v) -> true
);
branches[0].map(...).to("notStream");
branches[1].map(...).to("output_topic");

Upvotes: 1

miguno
miguno

Reputation: 15057

  1. Each msg should be processed and sent before processing the next msg with the help of stream function and not using another kafka producer

This will happen anyways by default.

  1. If the requirement one is achieved then I should be able to send msg to topics that will be decided dynamically according to type.

First, to make the step easier where you process events depending on their type, take a look at branch(). The branch() function lets you provide a fixed number of predicates to route messages into different sub-streams. You can then independently process these sub-streams, e.g. with the map() function. Finally, you can then send each sub-stream to a separate topic, with to().

KStream<String, Event>[] branches = events.branch(
    (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
    (id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);

You can also make truly dynamic routing decisions in the to() based on whatever is in the payload of an event. Here, the name of the output topic is derived from event data.

myStream.to(
  (eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);

Furthermore, if the dynamic routing decision requires information that is not directly available in an event, one option you have is to dynamically enrich the original event with routing-related information (e.g., by joining the original event stream against a table with routing-related info), and then do the dynamic routing via to(). See https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/ for details.

Upvotes: 3

Related Questions