Reputation: 516
I am creating a stream processing app. It should create a kafka stream connection. When a message arrives following are the required things I want to do:
end it to specific topics that are decided on the basis of the message type
public java.util.function.Consumer<KStream<String, String>> process() {
String topic;
return input ->
input.map((key, value) -> {
//check type and ask object from factory
try {
JSONObject msg = Util.getObjectMapper().readValue(value, JSONObject.class);
String type = msg.get("type").toString();
if(type.equalsIgnoreCase("test")){
//processing started
msgTypeHandlerFactory
.createInstance(type)
.enrichAndRelay(msg);
System.out.println("IN");
}
else{
input.to("notStream");
System.out.println("OUT");
}
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return KeyValue.pair(key, value);
})
.to("output_topic");
}
The issue with the above code is I'm using map function which grants me the ability to use the .to() function to send the stream. What I want is to check every message for type and then process, send to another stream accordingly. For this, I should use foreach function which doesn't give me .to() function so I have to create another Kafka Producer to do the job.
Requirement:
Upvotes: 2
Views: 5839
Reputation: 191681
If you want to check types, you're essentially filter
ing those events matching those types.
Therefore, you don't need map or foreach, you'd have better luck with filter(...).to(topic}
final ObjectMapper mapper = Util.getObjectMapper();
KStream notTestEvents = input.filter((key, value) -> {
//check type and ask object from factory
try {
JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
String type = msg.get("type").toString();
System.out.println("OUT");
return !type.equalsIgnoreCase("test");
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
);
notTestEvents.to("notStream");
The other option is branching
KStream<String, String>[] branches = events.branch(
(k, v) -> {
return !mapper
.readValue(value, JSONObject.class)
.get("type").toString();
.equalsIgnoreCase("test")
},
(k, v) -> true
);
branches[0].map(...).to("notStream");
branches[1].map(...).to("output_topic");
Upvotes: 1
Reputation: 15057
- Each msg should be processed and sent before processing the next msg with the help of stream function and not using another kafka producer
This will happen anyways by default.
- If the requirement one is achieved then I should be able to send msg to topics that will be decided dynamically according to type.
First, to make the step easier where you process events depending on their type, take a look at branch()
. The branch()
function lets you provide a fixed number of predicates to route messages into different sub-streams. You can then independently process these sub-streams, e.g. with the map()
function. Finally, you can then send each sub-stream to a separate topic, with to()
.
KStream<String, Event>[] branches = events.branch(
(id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
(id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);
You can also make truly dynamic routing decisions in the to()
based on whatever is in the payload of an event. Here, the name of the output topic is derived from event data.
myStream.to(
(eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);
Furthermore, if the dynamic routing decision requires information that is not directly available in an event, one option you have is to dynamically enrich the original event with routing-related information (e.g., by joining the original event stream against a table with routing-related info), and then do the dynamic routing via to()
. See https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/ for details.
Upvotes: 3