Reputation: 1
In my DSL based transformation, I have a stream-->branch, where in I want branched output redirected to multiple topics.
Current branch.to()
method accepts only a String
.
Is there any simple option with stream.branch
where I can route the result to multiple topics. With a consumer, I can subscribe to multiple topics by providing an array of string as topics.
My problem requires me to take multiple actions if particular predicate satisfies a query.
I tried with stream.branch[index].to(string)
, but this is not sufficient for my requirement. I am looking for something like stream.branch[index].to(string array of topics)
or stream.branch[index].to(string)
.
I expect the branch.to
method with multiple topics or is there any alternate way to achieve the same with streams?
adding sample code.Removed actual variable names.
My Predicates
Predicate <String, MyDomainObject> Predicate1 = new Predicate<String, MyDomainObject>() {
@Override
public boolean test(String key, MyDomainObject domObj) {
boolean result = false;
if condition on domObj
return result;
}
};
Predicate <String, MyDomainObject> Predicate2 = new Predicate<String, MyDomainObject>() {
@Override
public boolean test(String key, MyDomainObject domObj) {
boolean result = false;
if condition on domObj
return result;
}
};
KStream <String, MyDomainObject>[] branches= myStream.branch(
Predicate1, Predicate2
);
// here I need your suggestions.
// this is my current implementation
branches[0].to(singleTopic),
Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));
// I want to send notification to multiple topics. something like below
branches[0].to(topicList),
Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));
Upvotes: 0
Views: 3790
Reputation: 62285
If you know to which topics you want to send the data, you can do the following:
branches[0].to("first-topic");
branches[0].to("second-topic");
// etc.
Upvotes: 1