Reputation: 2167
Situation is the following.
We have setup SSL + ACLs in Kafka Broker.
We are setting up stream, which reads messages from two topics:
KStream<String, String> stringInput
= kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );
stringInput
.filter( streamFilter::passOrFilterMessages )
.map( processor )
.to( outTopicName );
It is done like two times (in the loop). Then we are setting general error handler:
streams.setUncaughtExceptionHandler( ( Thread t, Throwable e ) -> {
synchronized ( this ) {
LOG.fatal( ... );
this.stop();
}
}
);
Problem is the following. If for example in one topic certificate is no more valid. The stream is throwing exception Not authorized to access topics ... So far so good.
But the exception is handled by general error handler, so the complete application stops even if the second topic has no problems.
The question is, how to handle this exception per topic? How to avoid situation that at some moment complete application stops due to the problem that one single topic has problems with authorization?
I understand that if Broker is not available, then complete app may stop. But if only one topic is not available, then single stream shall stop, and not complete application, or?
Upvotes: 0
Views: 6937
Reputation: 62285
By design, Kafka Streams treats the topology a one and cannot distinguish between both parts. For your specific case, as you loop and build to independent pipelines, you could run two KafkaStreams
instances in parallel (within the same application/JVM) to isolate both from each other. Thus, if one fails, the other one is not affected. You would need to use two different application.id
for both instances.
Upvotes: 1