Reputation: 2104
I am trying to add StreamsUncaughtExceptionHandler to my Kafka stream processor. This processor is written with Kafka functions. I had a look at the suggestion provided by Artem Bilan to include the StreamsUncaughtExceptionHandler to my service, but my exceptions never get caught/handled by it.
Config Bean:
@Autowired
UnCaughtExceptionHandler exceptionHandler;
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {;
factoryBean.setStreamsUncaughtExceptionHandler(exceptionHandler);
}
@Override
public int getOrder() {
return Integer.MAX_VALUE;
}
};
}
Customized exception handler:
@Component
public class UnCaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
@Autowired
private StreamBridge streamBridge;
@Override
public StreamThreadExceptionResponse handle(Throwable exception) {
return StreamThreadExceptionResponse.REPLACE_THREAD;
}
}
Stream Process function:
@Autowired
private MyService service;
@Bean
public Function<KStream<String, Input>, KStream<String, Output>> processor() {
final AtomicReference<KeyValue<String, Output>> result = new AtomicReference<>(null);
return kStream -> kStream
.filter((key, value) -> value != null)
.filter((key, value) -> {
Optional<Output> outputResult = service.process(value);
if (outputResult.isPresent()) {
result.set(new KeyValue<>(key, outputResult.get()));
return true;
}
return false;
})
.map((messageKey, messageValue) -> result.get());
}
I expect UnCaughtExceptionHandler to handle any exceptions thrown by the service.process() method. But exceptions never get into the handle method; instead, they propagate to the root and die the client. Had a look at this solution as well, but I want to handle it more independent way.
Question: How do I handle any processing exceptions using StreamsUncaughtExceptionHandler?
Reproducible example: spring-cloud-kafka-streams-exception
Upvotes: 1
Views: 3216
Reputation: 5904
Here are a couple of things that you can try.
Try to set a breakpoint at this line in StreamsBuilderFactoryBean
and see what the value of the configure is. That should give some clues.
I noticed that you set Integer.MAX_VALUE
for the order in your configured impl. By default, StreamsBuilderFactoryBean
uses a phase value of Integer.MAX_VALUE - 1000
, so there might be a chance that by the time the factory bean is ready to start, the configurer may not be available yet since Integer.MAX_VALUE
has lower precedence. You can change your order to something like Integer.MAX_VALUE - 5000
to ensure that the configure bean is fully instantiated before the factory bean is started.
Start with those options and see if they give any indications for the issue. If it still persists, feel free to share with us a reproducible small sample application.
Upvotes: 1