samshers
samshers

Reputation: 3690

kafka streams: publish/send messages even when few record transformation throw exceptions?

A typical kafka streams application flow is as below (not including all step like props/serdes etc) -

    final StreamsBuilder builder = new StreamsBuilder();

    final KStream<String, String> textLines = builder.stream(inputTopic);

    final KStream<String, String> textTransformation_1 = textLines.processValues(value ->  value+"firstTranstormation");  

    final KStream<String, String> textTransformation_2 = textTransformation_1.processValues(value ->  value+"secondTranstormation"); 

    //my concern is at this stage -
    final KStream<String, String> textTransformation_3 = textTransformation_2.processValues(this::processValueAndDoRelatedStuff); 
    
    ....
    ....
    textTransformation_x.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
      

    final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);

    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  

Now if the processValueAndDoRelatedStuff(String input) method throws an error, I don't want the program to crash but want kafka to only NOT send that one transformation output to outputTopic (i.e ignore the transformation of that one record) and continue dealing with processing rest of the incoming messages normally.

Is the above possible??

In generally, as there is a way to skip sending transformation output to outputTopic based on a predicate. In the next stage, I can think of adding an filter, if in processValueAndDoRelatedStuff(String input) i can catch the exception and return some value based on which I can filter in the next stage.

final KStream<String, String> textTransformation_4 = textTransformation_3.filter((k,v) -> !v.equals("badrecord")); 

But I am more interested in the case where the exception is not handled but thrown from the mapper functions. Is it possible for kafka to ignore that one record causing an exception and still proceed with rest of processing.

Upvotes: 0

Views: 542

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 192013

The default behavior is to stop the topology on any uncaught exception.

If you want to catch them, simply don't use a function handle. Use a try-catch around the function

final KStream<String, String> textTransformation_3 = textTransformation_2.processValues(value -> {
  try {
    return processValueAndDoRelatedStuff(value); 
  } catch (Exception e) {
    // log, if you want
    return null;
  }
).filter((k, v) -> Objects.nonNull(v)); // remove events that caused exceptions

Otherwise, you can set exception handlers, as well - https://developer.confluent.io/learn-kafka/kafka-streams/error-handling/

Upvotes: 2

Related Questions